matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt, iter,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29    store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30    EncryptionSettings, EncryptionSyncChanges, OlmError, OlmMachine, RoomEventDecryptionResult,
31    TrustRequirement,
32};
33#[cfg(feature = "e2e-encryption")]
34use ruma::events::{
35    room::{history_visibility::HistoryVisibility, message::MessageType},
36    SyncMessageLikeEvent,
37};
38#[cfg(doc)]
39use ruma::DeviceId;
40use ruma::{
41    api::client::{self as api, sync::sync_events::v5},
42    events::{
43        ignored_user_list::IgnoredUserListEvent,
44        marked_unread::MarkedUnreadEventContent,
45        push_rules::{PushRulesEvent, PushRulesEventContent},
46        room::{
47            member::{MembershipState, RoomMemberEventContent, SyncRoomMemberEvent},
48            power_levels::{
49                RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
50            },
51        },
52        AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
53        AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent,
54        GlobalAccountDataEventType, StateEvent, StateEventType, SyncStateEvent,
55    },
56    push::{Action, PushConditionRoomCtx, Ruleset},
57    serde::Raw,
58    time::Instant,
59    OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
60};
61use tokio::sync::{broadcast, Mutex};
62#[cfg(feature = "e2e-encryption")]
63use tokio::sync::{RwLock, RwLockReadGuard};
64use tracing::{debug, error, info, instrument, trace, warn};
65
66#[cfg(feature = "e2e-encryption")]
67use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
68#[cfg(feature = "e2e-encryption")]
69use crate::RoomMemberships;
70use crate::{
71    deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent, TimelineEvent},
72    error::{Error, Result},
73    event_cache::store::EventCacheStoreLock,
74    response_processors::AccountDataProcessor,
75    rooms::{
76        normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
77        Room, RoomInfo, RoomState,
78    },
79    store::{
80        ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
81        Result as StoreResult, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
82        StoreConfig,
83    },
84    sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse, Timeline},
85    RoomStateFilter, SessionMeta,
86};
87
88/// A no (network) IO client implementation.
89///
90/// This client is a state machine that receives responses and events and
91/// accordingly updates its state. It is not designed to be used directly, but
92/// rather through `matrix_sdk::Client`.
93///
94/// ```rust
95/// use matrix_sdk_base::{store::StoreConfig, BaseClient};
96///
97/// let client = BaseClient::new(StoreConfig::new(
98///     "cross-process-holder-name".to_owned(),
99/// ));
100/// ```
101#[derive(Clone)]
102pub struct BaseClient {
103    /// The state store.
104    pub(crate) state_store: BaseStateStore,
105
106    /// The store used by the event cache.
107    event_cache_store: EventCacheStoreLock,
108
109    /// The store used for encryption.
110    ///
111    /// This field is only meant to be used for `OlmMachine` initialization.
112    /// All operations on it happen inside the `OlmMachine`.
113    #[cfg(feature = "e2e-encryption")]
114    crypto_store: Arc<DynCryptoStore>,
115
116    /// The olm-machine that is created once the
117    /// [`SessionMeta`][crate::session::SessionMeta] is set via
118    /// [`BaseClient::activate`]
119    #[cfg(feature = "e2e-encryption")]
120    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
121
122    /// Observable of when a user is ignored/unignored.
123    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
124
125    /// A sender that is used to communicate changes to room information. Each
126    /// tick contains the room ID and the reasons that have generated this tick.
127    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
128
129    /// The strategy to use for picking recipient devices, when sending an
130    /// encrypted message.
131    #[cfg(feature = "e2e-encryption")]
132    pub room_key_recipient_strategy: CollectStrategy,
133
134    /// The trust requirement to use for decrypting events.
135    #[cfg(feature = "e2e-encryption")]
136    pub decryption_trust_requirement: TrustRequirement,
137
138    /// If the client should handle verification events received when syncing.
139    #[cfg(feature = "e2e-encryption")]
140    pub handle_verification_events: bool,
141}
142
143#[cfg(not(tarpaulin_include))]
144impl fmt::Debug for BaseClient {
145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146        f.debug_struct("BaseClient")
147            .field("session_meta", &self.state_store.session_meta())
148            .field("sync_token", &self.state_store.sync_token)
149            .finish_non_exhaustive()
150    }
151}
152
153impl BaseClient {
154    /// Create a new client.
155    ///
156    /// # Arguments
157    ///
158    /// * `config` - the configuration for the stores (state store, event cache
159    ///   store and crypto store).
160    pub fn new(config: StoreConfig) -> Self {
161        let store = BaseStateStore::new(config.state_store);
162
163        // Create the channel to receive `RoomInfoNotableUpdate`.
164        //
165        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
166        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
167        // trigger such amount of updates, it's a safe value.
168        //
169        // Also, note that it must not be
170        // zero, because (i) it will panic, (ii) a new user has no room, but can create
171        // rooms; remember that the channel's capacity is immutable.
172        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
173            broadcast::channel(500);
174
175        BaseClient {
176            state_store: store,
177            event_cache_store: config.event_cache_store,
178            #[cfg(feature = "e2e-encryption")]
179            crypto_store: config.crypto_store,
180            #[cfg(feature = "e2e-encryption")]
181            olm_machine: Default::default(),
182            ignore_user_list_changes: Default::default(),
183            room_info_notable_update_sender,
184            #[cfg(feature = "e2e-encryption")]
185            room_key_recipient_strategy: Default::default(),
186            #[cfg(feature = "e2e-encryption")]
187            decryption_trust_requirement: TrustRequirement::Untrusted,
188            #[cfg(feature = "e2e-encryption")]
189            handle_verification_events: true,
190        }
191    }
192
193    /// Clones the current base client to use the same crypto store but a
194    /// different, in-memory store config, and resets transient state.
195    #[cfg(feature = "e2e-encryption")]
196    pub async fn clone_with_in_memory_state_store(
197        &self,
198        cross_process_store_locks_holder_name: &str,
199        handle_verification_events: bool,
200    ) -> Result<Self> {
201        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
202            .state_store(MemoryStore::new());
203        let config = config.crypto_store(self.crypto_store.clone());
204
205        let copy = Self {
206            state_store: BaseStateStore::new(config.state_store),
207            event_cache_store: config.event_cache_store,
208            // We copy the crypto store as well as the `OlmMachine` for two reasons:
209            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
210            // 2. We need to ensure that the parent and child use the same data and caches inside
211            //    the `OlmMachine` so the various ratchets and places where new randomness gets
212            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
213            //    or Olm sessions when they encrypt or decrypt messages.
214            crypto_store: self.crypto_store.clone(),
215            olm_machine: self.olm_machine.clone(),
216            ignore_user_list_changes: Default::default(),
217            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
218            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
219            decryption_trust_requirement: self.decryption_trust_requirement,
220            handle_verification_events,
221        };
222
223        copy.state_store
224            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
225            .await?;
226
227        Ok(copy)
228    }
229
230    /// Clones the current base client to use the same crypto store but a
231    /// different, in-memory store config, and resets transient state.
232    #[cfg(not(feature = "e2e-encryption"))]
233    #[allow(clippy::unused_async)]
234    pub async fn clone_with_in_memory_state_store(
235        &self,
236        cross_process_store_locks_holder: &str,
237        _handle_verification_events: bool,
238    ) -> Result<Self> {
239        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
240            .state_store(MemoryStore::new());
241        Ok(Self::new(config))
242    }
243
244    /// Get the session meta information.
245    ///
246    /// If the client is currently logged in, this will return a
247    /// [`SessionMeta`] object which contains the user ID and device ID.
248    /// Otherwise it returns `None`.
249    pub fn session_meta(&self) -> Option<&SessionMeta> {
250        self.state_store.session_meta()
251    }
252
253    /// Get all the rooms this client knows about.
254    pub fn rooms(&self) -> Vec<Room> {
255        self.state_store.rooms()
256    }
257
258    /// Get all the rooms this client knows about, filtered by room state.
259    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
260        self.state_store.rooms_filtered(filter)
261    }
262
263    /// Get a stream of all the rooms changes, in addition to the existing
264    /// rooms.
265    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
266        self.state_store.rooms_stream()
267    }
268
269    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
270    /// yet in the store
271    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
272        self.state_store.get_or_create_room(
273            room_id,
274            room_state,
275            self.room_info_notable_update_sender.clone(),
276        )
277    }
278
279    /// Get a reference to the state store.
280    pub fn state_store(&self) -> &DynStateStore {
281        self.state_store.deref()
282    }
283
284    /// Get a reference to the event cache store.
285    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
286        &self.event_cache_store
287    }
288
289    /// Check whether the client has been activated.
290    ///
291    /// See [`BaseClient::activate`] to know what it means.
292    pub fn is_active(&self) -> bool {
293        self.state_store.session_meta().is_some()
294    }
295
296    /// Activate the client.
297    ///
298    /// A client is considered active when:
299    ///
300    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
301    /// 2. Has loaded cached data from storage,
302    /// 3. If encryption is enabled, it also initialized or restored its
303    ///    `OlmMachine`.
304    ///
305    /// # Arguments
306    ///
307    /// * `session_meta` - The meta of a session that the user already has from
308    ///   a previous login call.
309    ///
310    /// * `custom_account` - A custom
311    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
312    ///   identity and one-time keys of this [`BaseClient`]. If no account is
313    ///   provided, a new default one or one from the store will be used. If an
314    ///   account is provided and one already exists in the store for this
315    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
316    ///   useful if one wishes to create identity keys before knowing the
317    ///   user/device IDs, e.g., to use the identity key as the device ID.
318    ///
319    /// # Panics
320    ///
321    /// This method panics if it is called twice.
322    pub async fn activate(
323        &self,
324        session_meta: SessionMeta,
325        #[cfg(feature = "e2e-encryption")] custom_account: Option<
326            crate::crypto::vodozemac::olm::Account,
327        >,
328    ) -> Result<()> {
329        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
330
331        self.state_store
332            .load_rooms(&session_meta.user_id, &self.room_info_notable_update_sender)
333            .await?;
334        self.state_store.load_sync_token().await?;
335        self.state_store.set_session_meta(session_meta);
336
337        #[cfg(feature = "e2e-encryption")]
338        self.regenerate_olm(custom_account).await?;
339
340        Ok(())
341    }
342
343    /// Recreate an `OlmMachine` from scratch.
344    ///
345    /// In particular, this will clear all its caches.
346    #[cfg(feature = "e2e-encryption")]
347    pub async fn regenerate_olm(
348        &self,
349        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
350    ) -> Result<()> {
351        tracing::debug!("regenerating OlmMachine");
352        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
353
354        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
355        // because we suspect it has stale data.
356        let olm_machine = OlmMachine::with_store(
357            &session_meta.user_id,
358            &session_meta.device_id,
359            self.crypto_store.clone(),
360            custom_account,
361        )
362        .await
363        .map_err(OlmError::from)?;
364
365        *self.olm_machine.write().await = Some(olm_machine);
366        Ok(())
367    }
368
369    /// Get the current, if any, sync token of the client.
370    /// This will be None if the client didn't sync at least once.
371    pub async fn sync_token(&self) -> Option<String> {
372        self.state_store.sync_token.read().await.clone()
373    }
374
375    #[cfg(feature = "e2e-encryption")]
376    async fn handle_verification_event(
377        &self,
378        event: &AnySyncMessageLikeEvent,
379        room_id: &RoomId,
380    ) -> Result<()> {
381        if !self.handle_verification_events {
382            return Ok(());
383        }
384
385        if let Some(olm) = self.olm_machine().await.as_ref() {
386            olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
387                .await?;
388        }
389
390        Ok(())
391    }
392
393    /// Attempt to decrypt the given raw event into a [`TimelineEvent`].
394    ///
395    /// In the case of a decryption error, returns a [`TimelineEvent`]
396    /// representing the decryption error; in the case of problems with our
397    /// application, returns `Err`.
398    ///
399    /// Returns `Ok(None)` if encryption is not configured.
400    #[cfg(feature = "e2e-encryption")]
401    async fn decrypt_sync_room_event(
402        &self,
403        event: &Raw<AnySyncTimelineEvent>,
404        room_id: &RoomId,
405    ) -> Result<Option<TimelineEvent>> {
406        let olm = self.olm_machine().await;
407        let Some(olm) = olm.as_ref() else { return Ok(None) };
408
409        let decryption_settings = DecryptionSettings {
410            sender_device_trust_requirement: self.decryption_trust_requirement,
411        };
412
413        let event = match olm
414            .try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings)
415            .await?
416        {
417            RoomEventDecryptionResult::Decrypted(decrypted) => {
418                let event: TimelineEvent = decrypted.into();
419
420                if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.raw().deserialize() {
421                    match &e {
422                        AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
423                            original_event,
424                        )) => {
425                            if let MessageType::VerificationRequest(_) =
426                                &original_event.content.msgtype
427                            {
428                                self.handle_verification_event(&e, room_id).await?;
429                            }
430                        }
431                        _ if e.event_type().to_string().starts_with("m.key.verification") => {
432                            self.handle_verification_event(&e, room_id).await?;
433                        }
434                        _ => (),
435                    }
436                }
437                event
438            }
439            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
440                TimelineEvent::new_utd_event(event.clone(), utd_info)
441            }
442        };
443
444        Ok(Some(event))
445    }
446
447    #[allow(clippy::too_many_arguments)]
448    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
449    pub(crate) async fn handle_timeline(
450        &self,
451        room: &Room,
452        limited: bool,
453        events: Vec<Raw<AnySyncTimelineEvent>>,
454        ignore_state_events: bool,
455        prev_batch: Option<String>,
456        push_rules: &Ruleset,
457        user_ids: &mut BTreeSet<OwnedUserId>,
458        room_info: &mut RoomInfo,
459        changes: &mut StateChanges,
460        notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
461        ambiguity_cache: &mut AmbiguityCache,
462    ) -> Result<Timeline> {
463        let mut timeline = Timeline::new(limited, prev_batch);
464        let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
465
466        for raw_event in events {
467            // Start by assuming we have a plaintext event. We'll replace it with a
468            // decrypted or UTD event below if necessary.
469            let mut event = TimelineEvent::new(raw_event);
470
471            match event.raw().deserialize() {
472                Ok(e) => {
473                    #[allow(clippy::single_match)]
474                    match &e {
475                        AnySyncTimelineEvent::State(s) if !ignore_state_events => {
476                            match s {
477                                AnySyncStateEvent::RoomMember(member) => {
478                                    Box::pin(ambiguity_cache.handle_event(
479                                        changes,
480                                        room.room_id(),
481                                        member,
482                                    ))
483                                    .await?;
484
485                                    match member.membership() {
486                                        MembershipState::Join | MembershipState::Invite => {
487                                            user_ids.insert(member.state_key().to_owned());
488                                        }
489                                        _ => {
490                                            user_ids.remove(member.state_key());
491                                        }
492                                    }
493
494                                    handle_room_member_event_for_profiles(
495                                        room.room_id(),
496                                        member,
497                                        changes,
498                                    );
499                                }
500                                _ => {
501                                    room_info.handle_state_event(s);
502                                }
503                            }
504
505                            let raw_event: Raw<AnySyncStateEvent> = event.raw().clone().cast();
506                            changes.add_state_event(room.room_id(), s.clone(), raw_event);
507                        }
508
509                        AnySyncTimelineEvent::State(_) => { /* do nothing */ }
510
511                        AnySyncTimelineEvent::MessageLike(
512                            AnySyncMessageLikeEvent::RoomRedaction(r),
513                        ) => {
514                            let room_version =
515                                room_info.room_version().unwrap_or(&RoomVersionId::V1);
516
517                            if let Some(redacts) = r.redacts(room_version) {
518                                room_info.handle_redaction(r, event.raw().cast_ref());
519                                let raw_event = event.raw().clone().cast();
520
521                                changes.add_redaction(room.room_id(), redacts, raw_event);
522                            }
523                        }
524
525                        #[cfg(feature = "e2e-encryption")]
526                        AnySyncTimelineEvent::MessageLike(e) => match e {
527                            AnySyncMessageLikeEvent::RoomEncrypted(
528                                SyncMessageLikeEvent::Original(_),
529                            ) => {
530                                if let Some(e) = Box::pin(
531                                    self.decrypt_sync_room_event(event.raw(), room.room_id()),
532                                )
533                                .await?
534                                {
535                                    event = e;
536                                }
537                            }
538                            AnySyncMessageLikeEvent::RoomMessage(
539                                SyncMessageLikeEvent::Original(original_event),
540                            ) => match &original_event.content.msgtype {
541                                MessageType::VerificationRequest(_) => {
542                                    Box::pin(self.handle_verification_event(e, room.room_id()))
543                                        .await?;
544                                }
545                                _ => (),
546                            },
547                            _ if e.event_type().to_string().starts_with("m.key.verification") => {
548                                Box::pin(self.handle_verification_event(e, room.room_id())).await?;
549                            }
550                            _ => (),
551                        },
552
553                        #[cfg(not(feature = "e2e-encryption"))]
554                        AnySyncTimelineEvent::MessageLike(_) => (),
555                    }
556
557                    if let Some(context) = &mut push_context {
558                        self.update_push_room_context(
559                            context,
560                            room.own_user_id(),
561                            room_info,
562                            changes,
563                        )
564                    } else {
565                        push_context = self.get_push_room_context(room, room_info, changes).await?;
566                    }
567
568                    if let Some(context) = &push_context {
569                        let actions = push_rules.get_actions(event.raw(), context);
570
571                        if actions.iter().any(Action::should_notify) {
572                            notifications.entry(room.room_id().to_owned()).or_default().push(
573                                Notification {
574                                    actions: actions.to_owned(),
575                                    event: RawAnySyncOrStrippedTimelineEvent::Sync(
576                                        event.raw().clone(),
577                                    ),
578                                },
579                            );
580                        }
581                        event.push_actions = Some(actions.to_owned());
582                    }
583                }
584                Err(e) => {
585                    warn!("Error deserializing event: {e}");
586                }
587            }
588
589            timeline.events.push(event);
590        }
591
592        Ok(timeline)
593    }
594
595    /// Handles the stripped state events in `invite_state`, modifying the
596    /// room's info and posting notifications as needed.
597    ///
598    /// * `room` - The [`Room`] to modify.
599    /// * `events` - The contents of `invite_state` in the form of list of pairs
600    ///   of raw stripped state events with their deserialized counterpart.
601    /// * `push_rules` - The push rules for this room.
602    /// * `room_info` - The current room's info.
603    /// * `changes` - The accumulated list of changes to apply once the
604    ///   processing is finished.
605    /// * `notifications` - Notifications to post for the current room.
606    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
607    pub(crate) async fn handle_invited_state(
608        &self,
609        room: &Room,
610        events: &[(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)],
611        push_rules: &Ruleset,
612        room_info: &mut RoomInfo,
613        changes: &mut StateChanges,
614        notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
615    ) -> Result<()> {
616        let mut state_events = BTreeMap::new();
617
618        for (raw_event, event) in events {
619            room_info.handle_stripped_state_event(event);
620            state_events
621                .entry(event.event_type())
622                .or_insert_with(BTreeMap::new)
623                .insert(event.state_key().to_owned(), raw_event.clone());
624        }
625
626        changes.stripped_state.insert(room_info.room_id().to_owned(), state_events.clone());
627
628        // We need to check for notifications after we have handled all state
629        // events, to make sure we have the full push context.
630        if let Some(push_context) = self.get_push_room_context(room, room_info, changes).await? {
631            // Check every event again for notification.
632            for event in state_events.values().flat_map(|map| map.values()) {
633                let actions = push_rules.get_actions(event, &push_context);
634                if actions.iter().any(Action::should_notify) {
635                    notifications.entry(room.room_id().to_owned()).or_default().push(
636                        Notification {
637                            actions: actions.to_owned(),
638                            event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
639                        },
640                    );
641                }
642            }
643        }
644
645        Ok(())
646    }
647
648    /// Process the events provided during a sync.
649    ///
650    /// events must be exactly the same list of events that are in raw_events,
651    /// but deserialised. We demand them here to avoid deserialising
652    /// multiple times.
653    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
654    pub(crate) async fn handle_state(
655        &self,
656        raw_events: &[Raw<AnySyncStateEvent>],
657        events: &[AnySyncStateEvent],
658        room_info: &mut RoomInfo,
659        changes: &mut StateChanges,
660        ambiguity_cache: &mut AmbiguityCache,
661    ) -> StoreResult<BTreeSet<OwnedUserId>> {
662        let mut state_events = BTreeMap::new();
663        let mut user_ids = BTreeSet::new();
664
665        assert_eq!(raw_events.len(), events.len());
666
667        for (raw_event, event) in iter::zip(raw_events, events) {
668            room_info.handle_state_event(event);
669
670            if let AnySyncStateEvent::RoomMember(member) = &event {
671                ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
672
673                match member.membership() {
674                    MembershipState::Join | MembershipState::Invite => {
675                        user_ids.insert(member.state_key().to_owned());
676                    }
677                    _ => (),
678                }
679
680                handle_room_member_event_for_profiles(&room_info.room_id, member, changes);
681            }
682
683            state_events
684                .entry(event.event_type())
685                .or_insert_with(BTreeMap::new)
686                .insert(event.state_key().to_owned(), raw_event.clone());
687        }
688
689        changes.state.insert((*room_info.room_id).to_owned(), state_events);
690
691        Ok(user_ids)
692    }
693
694    #[instrument(skip_all, fields(?room_id))]
695    pub(crate) async fn handle_room_account_data(
696        &self,
697        room_id: &RoomId,
698        events: &[Raw<AnyRoomAccountDataEvent>],
699        changes: &mut StateChanges,
700        room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
701    ) {
702        // Small helper to make the code easier to read.
703        //
704        // It finds the appropriate `RoomInfo`, allowing the caller to modify it, and
705        // save it in the correct place.
706        fn on_room_info<F>(
707            room_id: &RoomId,
708            changes: &mut StateChanges,
709            client: &BaseClient,
710            mut on_room_info: F,
711        ) where
712            F: FnMut(&mut RoomInfo),
713        {
714            // `StateChanges` has the `RoomInfo`.
715            if let Some(room_info) = changes.room_infos.get_mut(room_id) {
716                // Show time.
717                on_room_info(room_info);
718            }
719            // The `BaseClient` has the `Room`, which has the `RoomInfo`.
720            else if let Some(room) = client.state_store.room(room_id) {
721                // Clone the `RoomInfo`.
722                let mut room_info = room.clone_info();
723
724                // Show time.
725                on_room_info(&mut room_info);
726
727                // Update the `RoomInfo` via `StateChanges`.
728                changes.add_room(room_info);
729            }
730        }
731
732        // Helper to update the unread marker for stable and unstable prefixes.
733        fn on_unread_marker(
734            room_id: &RoomId,
735            content: &MarkedUnreadEventContent,
736            room_info: &mut RoomInfo,
737            room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
738        ) {
739            if room_info.base_info.is_marked_unread != content.unread {
740                // Notify the room list about a manual read marker change if the
741                // value's changed.
742                room_info_notable_updates
743                    .entry(room_id.to_owned())
744                    .or_default()
745                    .insert(RoomInfoNotableUpdateReasons::UNREAD_MARKER);
746            }
747
748            room_info.base_info.is_marked_unread = content.unread;
749        }
750
751        // Handle new events.
752        for raw_event in events {
753            match raw_event.deserialize() {
754                Ok(event) => {
755                    changes.add_room_account_data(room_id, event.clone(), raw_event.clone());
756
757                    match event {
758                        AnyRoomAccountDataEvent::MarkedUnread(event) => {
759                            on_room_info(room_id, changes, self, |room_info| {
760                                on_unread_marker(
761                                    room_id,
762                                    &event.content,
763                                    room_info,
764                                    room_info_notable_updates,
765                                );
766                            });
767                        }
768                        AnyRoomAccountDataEvent::UnstableMarkedUnread(event) => {
769                            on_room_info(room_id, changes, self, |room_info| {
770                                on_unread_marker(
771                                    room_id,
772                                    &event.content.0,
773                                    room_info,
774                                    room_info_notable_updates,
775                                );
776                            });
777                        }
778                        AnyRoomAccountDataEvent::Tag(event) => {
779                            on_room_info(room_id, changes, self, |room_info| {
780                                room_info.base_info.handle_notable_tags(&event.content.tags);
781                            });
782                        }
783
784                        // Nothing.
785                        _ => {}
786                    }
787                }
788
789                Err(err) => {
790                    warn!("unable to deserialize account data event: {err}");
791                }
792            }
793        }
794    }
795
796    #[cfg(feature = "e2e-encryption")]
797    #[instrument(skip_all)]
798    pub(crate) async fn preprocess_to_device_events(
799        &self,
800        encryption_sync_changes: EncryptionSyncChanges<'_>,
801        changes: &mut StateChanges,
802        room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
803    ) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
804        if let Some(o) = self.olm_machine().await.as_ref() {
805            // Let the crypto machine handle the sync response, this
806            // decrypts to-device events, but leaves room events alone.
807            // This makes sure that we have the decryption keys for the room
808            // events at hand.
809            let (events, room_key_updates) =
810                o.receive_sync_changes(encryption_sync_changes).await?;
811
812            for room_key_update in room_key_updates {
813                if let Some(room) = self.get_room(&room_key_update.room_id) {
814                    self.decrypt_latest_events(&room, changes, room_info_notable_updates).await;
815                }
816            }
817
818            Ok(events)
819        } else {
820            // If we have no `OlmMachine`, just return the events that were passed in.
821            // This should not happen unless we forget to set things up by calling
822            // `Self::activate()`.
823            Ok(encryption_sync_changes.to_device_events)
824        }
825    }
826
827    /// Decrypt any of this room's latest_encrypted_events
828    /// that we can and if we can, change latest_event to reflect what we
829    /// found, and remove any older encrypted events from
830    /// latest_encrypted_events.
831    #[cfg(feature = "e2e-encryption")]
832    async fn decrypt_latest_events(
833        &self,
834        room: &Room,
835        changes: &mut StateChanges,
836        room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
837    ) {
838        // Try to find a message we can decrypt and is suitable for using as the latest
839        // event. If we found one, set it as the latest and delete any older
840        // encrypted events
841        if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
842            room.on_latest_event_decrypted(found, found_index, changes, room_info_notable_updates);
843        }
844    }
845
846    /// Attempt to decrypt a latest event, trying the latest stored encrypted
847    /// one first, and walking backwards, stopping when we find an event
848    /// that we can decrypt, and that is suitable to be the latest event
849    /// (i.e. we can usefully display it as a message preview). Returns the
850    /// decrypted event if we found one, along with its index in the
851    /// latest_encrypted_events list, or None if we didn't find one.
852    #[cfg(feature = "e2e-encryption")]
853    async fn decrypt_latest_suitable_event(
854        &self,
855        room: &Room,
856    ) -> Option<(Box<LatestEvent>, usize)> {
857        let enc_events = room.latest_encrypted_events();
858        let power_levels = room.power_levels().await.ok();
859        let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
860
861        // Walk backwards through the encrypted events, looking for one we can decrypt
862        for (i, event) in enc_events.iter().enumerate().rev() {
863            // Size of the decrypt_sync_room_event future should not impact this
864            // async fn since it is likely that there aren't even any encrypted
865            // events when calling it.
866            let decrypt_sync_room_event =
867                Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
868
869            if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
870                // We found an event we can decrypt
871                if let Ok(any_sync_event) = decrypted.raw().deserialize() {
872                    // We can deserialize it to find its type
873                    match is_suitable_for_latest_event(&any_sync_event, power_levels_info) {
874                        PossibleLatestEvent::YesRoomMessage(_)
875                        | PossibleLatestEvent::YesPoll(_)
876                        | PossibleLatestEvent::YesCallInvite(_)
877                        | PossibleLatestEvent::YesCallNotify(_)
878                        | PossibleLatestEvent::YesSticker(_)
879                        | PossibleLatestEvent::YesKnockedStateEvent(_) => {
880                            return Some((Box::new(LatestEvent::new(decrypted)), i));
881                        }
882                        _ => (),
883                    }
884                }
885            }
886        }
887        None
888    }
889
890    /// User has knocked on a room.
891    ///
892    /// Update the internal and cached state accordingly. Return the final Room.
893    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
894        let room = self.state_store.get_or_create_room(
895            room_id,
896            RoomState::Knocked,
897            self.room_info_notable_update_sender.clone(),
898        );
899
900        if room.state() != RoomState::Knocked {
901            let _sync_lock = self.sync_lock().lock().await;
902
903            let mut room_info = room.clone_info();
904            room_info.mark_as_knocked();
905            room_info.mark_state_partially_synced();
906            room_info.mark_members_missing(); // the own member event changed
907            let mut changes = StateChanges::default();
908            changes.add_room(room_info.clone());
909            self.state_store.save_changes(&changes).await?; // Update the store
910            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
911        }
912
913        Ok(room)
914    }
915
916    /// User has joined a room.
917    ///
918    /// Update the internal and cached state accordingly. Return the final Room.
919    pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
920        let room = self.state_store.get_or_create_room(
921            room_id,
922            RoomState::Joined,
923            self.room_info_notable_update_sender.clone(),
924        );
925
926        if room.state() != RoomState::Joined {
927            let _sync_lock = self.sync_lock().lock().await;
928
929            let mut room_info = room.clone_info();
930            room_info.mark_as_joined();
931            room_info.mark_state_partially_synced();
932            room_info.mark_members_missing(); // the own member event changed
933            let mut changes = StateChanges::default();
934            changes.add_room(room_info.clone());
935            self.state_store.save_changes(&changes).await?; // Update the store
936            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
937        }
938
939        Ok(room)
940    }
941
942    /// User has left a room.
943    ///
944    /// Update the internal and cached state accordingly.
945    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
946        let room = self.state_store.get_or_create_room(
947            room_id,
948            RoomState::Left,
949            self.room_info_notable_update_sender.clone(),
950        );
951
952        if room.state() != RoomState::Left {
953            let _sync_lock = self.sync_lock().lock().await;
954
955            let mut room_info = room.clone_info();
956            room_info.mark_as_left();
957            room_info.mark_state_partially_synced();
958            room_info.mark_members_missing(); // the own member event changed
959            let mut changes = StateChanges::default();
960            changes.add_room(room_info.clone());
961            self.state_store.save_changes(&changes).await?; // Update the store
962            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
963        }
964
965        Ok(())
966    }
967
968    /// Get access to the store's sync lock.
969    pub fn sync_lock(&self) -> &Mutex<()> {
970        self.state_store.sync_lock()
971    }
972
973    /// Receive a response from a sync call.
974    ///
975    /// # Arguments
976    ///
977    /// * `response` - The response that we received after a successful sync.
978    #[instrument(skip_all)]
979    pub async fn receive_sync_response(
980        &self,
981        response: api::sync::sync_events::v3::Response,
982    ) -> Result<SyncResponse> {
983        self.receive_sync_response_with_requested_required_states(
984            response,
985            &RequestedRequiredStates::default(),
986        )
987        .await
988    }
989
990    /// Receive a response from a sync call, with the requested required state
991    /// events.
992    ///
993    /// # Arguments
994    ///
995    /// * `response` - The response that we received after a successful sync.
996    /// * `requested_required_states` - The requested required state events.
997    pub async fn receive_sync_response_with_requested_required_states(
998        &self,
999        response: api::sync::sync_events::v3::Response,
1000        requested_required_states: &RequestedRequiredStates,
1001    ) -> Result<SyncResponse> {
1002        // The server might respond multiple times with the same sync token, in
1003        // that case we already received this response and there's nothing to
1004        // do.
1005        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
1006            info!("Got the same sync response twice");
1007            return Ok(SyncResponse::default());
1008        }
1009
1010        let now = Instant::now();
1011        let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
1012
1013        #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
1014        let mut room_info_notable_updates =
1015            BTreeMap::<OwnedRoomId, RoomInfoNotableUpdateReasons>::new();
1016
1017        #[cfg(feature = "e2e-encryption")]
1018        let to_device = self
1019            .preprocess_to_device_events(
1020                EncryptionSyncChanges {
1021                    to_device_events: response.to_device.events,
1022                    changed_devices: &response.device_lists,
1023                    one_time_keys_counts: &response.device_one_time_keys_count,
1024                    unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
1025                    next_batch_token: Some(response.next_batch.clone()),
1026                },
1027                &mut changes,
1028                &mut room_info_notable_updates,
1029            )
1030            .await?;
1031
1032        #[cfg(not(feature = "e2e-encryption"))]
1033        let to_device = response.to_device.events;
1034
1035        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
1036
1037        let account_data_processor = AccountDataProcessor::process(&response.account_data.events);
1038
1039        let push_rules = self.get_push_rules(&account_data_processor).await?;
1040
1041        let mut new_rooms = RoomUpdates::default();
1042        let mut notifications = Default::default();
1043
1044        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
1045            BTreeMap::new();
1046
1047        for (room_id, new_info) in response.rooms.join {
1048            let room = self.state_store.get_or_create_room(
1049                &room_id,
1050                RoomState::Joined,
1051                self.room_info_notable_update_sender.clone(),
1052            );
1053
1054            let mut room_info = room.clone_info();
1055
1056            room_info.mark_as_joined();
1057            room_info.update_from_ruma_summary(&new_info.summary);
1058            room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
1059            room_info.mark_state_fully_synced();
1060            room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
1061
1062            let state_events = Self::deserialize_state_events(&new_info.state.events);
1063            let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1064                state_events.into_iter().unzip();
1065
1066            let mut user_ids = self
1067                .handle_state(
1068                    &raw_state_events,
1069                    &state_events,
1070                    &mut room_info,
1071                    &mut changes,
1072                    &mut ambiguity_cache,
1073                )
1074                .await?;
1075
1076            updated_members_in_room.insert(room_id.to_owned(), user_ids.clone());
1077
1078            for raw in &new_info.ephemeral.events {
1079                match raw.deserialize() {
1080                    Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
1081                        changes.add_receipts(&room_id, event.content);
1082                    }
1083                    Ok(_) => {}
1084                    Err(e) => {
1085                        let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
1086                        #[rustfmt::skip]
1087                        info!(
1088                            ?room_id, event_id,
1089                            "Failed to deserialize ephemeral room event: {e}"
1090                        );
1091                    }
1092                }
1093            }
1094
1095            if new_info.timeline.limited {
1096                room_info.mark_members_missing();
1097            }
1098
1099            let timeline = self
1100                .handle_timeline(
1101                    &room,
1102                    new_info.timeline.limited,
1103                    new_info.timeline.events,
1104                    false,
1105                    new_info.timeline.prev_batch,
1106                    &push_rules,
1107                    &mut user_ids,
1108                    &mut room_info,
1109                    &mut changes,
1110                    &mut notifications,
1111                    &mut ambiguity_cache,
1112                )
1113                .await?;
1114
1115            // Save the new `RoomInfo`.
1116            changes.add_room(room_info);
1117
1118            self.handle_room_account_data(
1119                &room_id,
1120                &new_info.account_data.events,
1121                &mut changes,
1122                &mut Default::default(),
1123            )
1124            .await;
1125
1126            // `Self::handle_room_account_data` might have updated the `RoomInfo`. Let's
1127            // fetch it again.
1128            //
1129            // SAFETY: `unwrap` is safe because the `RoomInfo` has been inserted 2 lines
1130            // above.
1131            let mut room_info = changes.room_infos.get(&room_id).unwrap().clone();
1132
1133            #[cfg(feature = "e2e-encryption")]
1134            if room_info.encryption_state().is_encrypted() {
1135                if let Some(o) = self.olm_machine().await.as_ref() {
1136                    if !room.encryption_state().is_encrypted() {
1137                        // The room turned on encryption in this sync, we need
1138                        // to also get all the existing users and mark them for
1139                        // tracking.
1140                        let user_ids = self
1141                            .state_store
1142                            .get_user_ids(&room_id, RoomMemberships::ACTIVE)
1143                            .await?;
1144                        o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1145                    }
1146
1147                    o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
1148                }
1149            }
1150
1151            let notification_count = new_info.unread_notifications.into();
1152            room_info.update_notification_count(notification_count);
1153
1154            let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1155
1156            new_rooms.join.insert(
1157                room_id,
1158                JoinedRoomUpdate::new(
1159                    timeline,
1160                    new_info.state.events,
1161                    new_info.account_data.events,
1162                    new_info.ephemeral.events,
1163                    notification_count,
1164                    ambiguity_changes,
1165                ),
1166            );
1167
1168            changes.add_room(room_info);
1169        }
1170
1171        for (room_id, new_info) in response.rooms.leave {
1172            let room = self.state_store.get_or_create_room(
1173                &room_id,
1174                RoomState::Left,
1175                self.room_info_notable_update_sender.clone(),
1176            );
1177
1178            let mut room_info = room.clone_info();
1179            room_info.mark_as_left();
1180            room_info.mark_state_partially_synced();
1181            room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
1182
1183            let state_events = Self::deserialize_state_events(&new_info.state.events);
1184            let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1185                state_events.into_iter().unzip();
1186
1187            let mut user_ids = self
1188                .handle_state(
1189                    &raw_state_events,
1190                    &state_events,
1191                    &mut room_info,
1192                    &mut changes,
1193                    &mut ambiguity_cache,
1194                )
1195                .await?;
1196
1197            let timeline = self
1198                .handle_timeline(
1199                    &room,
1200                    new_info.timeline.limited,
1201                    new_info.timeline.events,
1202                    false,
1203                    new_info.timeline.prev_batch,
1204                    &push_rules,
1205                    &mut user_ids,
1206                    &mut room_info,
1207                    &mut changes,
1208                    &mut notifications,
1209                    &mut ambiguity_cache,
1210                )
1211                .await?;
1212
1213            // Save the new `RoomInfo`.
1214            changes.add_room(room_info);
1215
1216            self.handle_room_account_data(
1217                &room_id,
1218                &new_info.account_data.events,
1219                &mut changes,
1220                &mut Default::default(),
1221            )
1222            .await;
1223
1224            let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1225
1226            new_rooms.leave.insert(
1227                room_id,
1228                LeftRoomUpdate::new(
1229                    timeline,
1230                    new_info.state.events,
1231                    new_info.account_data.events,
1232                    ambiguity_changes,
1233                ),
1234            );
1235        }
1236
1237        for (room_id, new_info) in response.rooms.invite {
1238            let room = self.state_store.get_or_create_room(
1239                &room_id,
1240                RoomState::Invited,
1241                self.room_info_notable_update_sender.clone(),
1242            );
1243
1244            let invite_state =
1245                Self::deserialize_stripped_state_events(&new_info.invite_state.events);
1246
1247            let mut room_info = room.clone_info();
1248            room_info.mark_as_invited();
1249            room_info.mark_state_fully_synced();
1250
1251            self.handle_invited_state(
1252                &room,
1253                &invite_state,
1254                &push_rules,
1255                &mut room_info,
1256                &mut changes,
1257                &mut notifications,
1258            )
1259            .await?;
1260
1261            changes.add_room(room_info);
1262
1263            new_rooms.invite.insert(room_id, new_info);
1264        }
1265
1266        for (room_id, new_info) in response.rooms.knock {
1267            let room = self.state_store.get_or_create_room(
1268                &room_id,
1269                RoomState::Knocked,
1270                self.room_info_notable_update_sender.clone(),
1271            );
1272
1273            let knock_state = Self::deserialize_stripped_state_events(&new_info.knock_state.events);
1274
1275            let mut room_info = room.clone_info();
1276            room_info.mark_as_knocked();
1277            room_info.mark_state_fully_synced();
1278
1279            self.handle_invited_state(
1280                &room,
1281                &knock_state,
1282                &push_rules,
1283                &mut room_info,
1284                &mut changes,
1285                &mut notifications,
1286            )
1287            .await?;
1288
1289            changes.add_room(room_info);
1290
1291            new_rooms.knocked.insert(room_id, new_info);
1292        }
1293
1294        account_data_processor.apply(&mut changes, &self.state_store).await;
1295
1296        changes.presence = response
1297            .presence
1298            .events
1299            .iter()
1300            .filter_map(|e| {
1301                let event = e.deserialize().ok()?;
1302                Some((event.sender, e.clone()))
1303            })
1304            .collect();
1305
1306        changes.ambiguity_maps = ambiguity_cache.cache;
1307
1308        {
1309            let _sync_lock = self.sync_lock().lock().await;
1310            let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
1311            self.state_store.save_changes(&changes).await?;
1312            *self.state_store.sync_token.write().await = Some(response.next_batch.clone());
1313            self.apply_changes(&changes, room_info_notable_updates, prev_ignored_user_list);
1314        }
1315
1316        // Now that all the rooms information have been saved, update the display name
1317        // cache (which relies on information stored in the database). This will
1318        // live in memory, until the next sync which will saves the room info to
1319        // disk; we do this to avoid saving that would be redundant with the
1320        // above. Oh well.
1321        new_rooms.update_in_memory_caches(&self.state_store).await;
1322
1323        for (room_id, member_ids) in updated_members_in_room {
1324            if let Some(room) = self.get_room(&room_id) {
1325                let _ =
1326                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
1327            }
1328        }
1329
1330        info!("Processed a sync response in {:?}", now.elapsed());
1331
1332        let response = SyncResponse {
1333            rooms: new_rooms,
1334            presence: response.presence.events,
1335            account_data: response.account_data.events,
1336            to_device,
1337            notifications,
1338        };
1339
1340        Ok(response)
1341    }
1342
1343    pub(crate) async fn load_previous_ignored_user_list(
1344        &self,
1345    ) -> Option<Raw<IgnoredUserListEvent>> {
1346        self.state_store().get_account_data_event_static().await.ok().flatten()
1347    }
1348
1349    pub(crate) fn apply_changes(
1350        &self,
1351        changes: &StateChanges,
1352        room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
1353        prev_ignored_user_list: Option<Raw<IgnoredUserListEvent>>,
1354    ) {
1355        if let Some(event) = changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
1356        {
1357            match event.deserialize_as::<IgnoredUserListEvent>() {
1358                Ok(event) => {
1359                    let user_ids: Vec<String> =
1360                        event.content.ignored_users.keys().map(|id| id.to_string()).collect();
1361
1362                    // Try to only trigger the observable if the ignored user list has changed,
1363                    // from the previous time we've seen it. If we couldn't load the previous event
1364                    // for any reason, always trigger.
1365                    if let Some(prev_user_ids) =
1366                        prev_ignored_user_list.and_then(|raw| raw.deserialize().ok()).map(|event| {
1367                            event
1368                                .content
1369                                .ignored_users
1370                                .keys()
1371                                .map(|id| id.to_string())
1372                                .collect::<Vec<_>>()
1373                        })
1374                    {
1375                        if user_ids != prev_user_ids {
1376                            self.ignore_user_list_changes.set(user_ids);
1377                        }
1378                    } else {
1379                        self.ignore_user_list_changes.set(user_ids);
1380                    }
1381                }
1382
1383                Err(error) => {
1384                    error!("Failed to deserialize ignored user list event: {error}")
1385                }
1386            }
1387        }
1388
1389        for (room_id, room_info) in &changes.room_infos {
1390            if let Some(room) = self.state_store.room(room_id) {
1391                let room_info_notable_update_reasons =
1392                    room_info_notable_updates.get(room_id).copied().unwrap_or_default();
1393
1394                room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
1395            }
1396        }
1397    }
1398
1399    /// Receive a get member events response and convert it to a deserialized
1400    /// `MembersResponse`
1401    ///
1402    /// This client-server request must be made without filters to make sure all
1403    /// members are received. Otherwise, an error is returned.
1404    ///
1405    /// # Arguments
1406    ///
1407    /// * `room_id` - The room id this response belongs to.
1408    ///
1409    /// * `response` - The raw response that was received from the server.
1410    #[instrument(skip_all, fields(?room_id))]
1411    pub async fn receive_all_members(
1412        &self,
1413        room_id: &RoomId,
1414        request: &api::membership::get_member_events::v3::Request,
1415        response: &api::membership::get_member_events::v3::Response,
1416    ) -> Result<()> {
1417        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
1418        {
1419            // This function assumes all members are loaded at once to optimise how display
1420            // name disambiguation works. Using it with partial member list results
1421            // would produce incorrect disambiguated display name entries
1422            return Err(Error::InvalidReceiveMembersParameters);
1423        }
1424
1425        let Some(room) = self.state_store.room(room_id) else {
1426            // The room is unknown to us: leave early.
1427            return Ok(());
1428        };
1429
1430        let mut chunk = Vec::with_capacity(response.chunk.len());
1431        let mut changes = StateChanges::default();
1432
1433        #[cfg(feature = "e2e-encryption")]
1434        let mut user_ids = BTreeSet::new();
1435
1436        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
1437
1438        for raw_event in &response.chunk {
1439            let member = match raw_event.deserialize() {
1440                Ok(ev) => ev,
1441                Err(e) => {
1442                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
1443                    debug!(event_id, "Failed to deserialize member event: {e}");
1444                    continue;
1445                }
1446            };
1447
1448            // TODO: All the actions in this loop used to be done only when the membership
1449            // event was not in the store before. This was changed with the new room API,
1450            // because e.g. leaving a room makes members events outdated and they need to be
1451            // fetched by `members`. Therefore, they need to be overwritten here, even
1452            // if they exist.
1453            // However, this makes a new problem occur where setting the member events here
1454            // potentially races with the sync.
1455            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
1456
1457            #[cfg(feature = "e2e-encryption")]
1458            match member.membership() {
1459                MembershipState::Join | MembershipState::Invite => {
1460                    user_ids.insert(member.state_key().to_owned());
1461                }
1462                _ => (),
1463            }
1464
1465            if let StateEvent::Original(e) = &member {
1466                if let Some(d) = &e.content.displayname {
1467                    let display_name = DisplayName::new(d);
1468                    ambiguity_map
1469                        .entry(display_name)
1470                        .or_default()
1471                        .insert(member.state_key().clone());
1472                }
1473            }
1474
1475            let sync_member: SyncRoomMemberEvent = member.clone().into();
1476            handle_room_member_event_for_profiles(room_id, &sync_member, &mut changes);
1477
1478            changes
1479                .state
1480                .entry(room_id.to_owned())
1481                .or_default()
1482                .entry(member.event_type())
1483                .or_default()
1484                .insert(member.state_key().to_string(), raw_event.clone().cast());
1485            chunk.push(member);
1486        }
1487
1488        #[cfg(feature = "e2e-encryption")]
1489        if room.encryption_state().is_encrypted() {
1490            if let Some(o) = self.olm_machine().await.as_ref() {
1491                o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1492            }
1493        }
1494
1495        changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
1496
1497        let _sync_lock = self.sync_lock().lock().await;
1498        let mut room_info = room.clone_info();
1499        room_info.mark_members_synced();
1500        changes.add_room(room_info);
1501
1502        let prev_ignored_user_list = self.load_previous_ignored_user_list().await;
1503        self.state_store.save_changes(&changes).await?;
1504        self.apply_changes(&changes, Default::default(), prev_ignored_user_list);
1505
1506        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
1507
1508        Ok(())
1509    }
1510
1511    /// Receive a successful filter upload response, the filter id will be
1512    /// stored under the given name in the store.
1513    ///
1514    /// The filter id can later be retrieved with the [`get_filter`] method.
1515    ///
1516    ///
1517    /// # Arguments
1518    ///
1519    /// * `filter_name` - The name that should be used to persist the filter id
1520    ///   in the store.
1521    ///
1522    /// * `response` - The successful filter upload response containing the
1523    ///   filter id.
1524    ///
1525    /// [`get_filter`]: #method.get_filter
1526    pub async fn receive_filter_upload(
1527        &self,
1528        filter_name: &str,
1529        response: &api::filter::create_filter::v3::Response,
1530    ) -> Result<()> {
1531        Ok(self
1532            .state_store
1533            .set_kv_data(
1534                StateStoreDataKey::Filter(filter_name),
1535                StateStoreDataValue::Filter(response.filter_id.clone()),
1536            )
1537            .await?)
1538    }
1539
1540    /// Get the filter id of a previously uploaded filter.
1541    ///
1542    /// *Note*: A filter will first need to be uploaded and persisted using
1543    /// [`receive_filter_upload`].
1544    ///
1545    /// # Arguments
1546    ///
1547    /// * `filter_name` - The name of the filter that was previously used to
1548    ///   persist the filter.
1549    ///
1550    /// [`receive_filter_upload`]: #method.receive_filter_upload
1551    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1552        let filter = self
1553            .state_store
1554            .get_kv_data(StateStoreDataKey::Filter(filter_name))
1555            .await?
1556            .map(|d| d.into_filter().expect("State store data not a filter"));
1557
1558        Ok(filter)
1559    }
1560
1561    /// Get a to-device request that will share a room key with users in a room.
1562    #[cfg(feature = "e2e-encryption")]
1563    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1564        match self.olm_machine().await.as_ref() {
1565            Some(o) => {
1566                let Some(room) = self.get_room(room_id) else {
1567                    return Err(Error::InsufficientData);
1568                };
1569
1570                let history_visibility = room.history_visibility_or_default();
1571                let Some(room_encryption_event) = room.encryption_settings() else {
1572                    return Err(Error::EncryptionNotEnabled);
1573                };
1574
1575                // Don't share the group session with members that are invited
1576                // if the history visibility is set to `Joined`
1577                let filter = if history_visibility == HistoryVisibility::Joined {
1578                    RoomMemberships::JOIN
1579                } else {
1580                    RoomMemberships::ACTIVE
1581                };
1582
1583                let members = self.state_store.get_user_ids(room_id, filter).await?;
1584
1585                let settings = EncryptionSettings::new(
1586                    room_encryption_event,
1587                    history_visibility,
1588                    self.room_key_recipient_strategy.clone(),
1589                );
1590
1591                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1592            }
1593            None => panic!("Olm machine wasn't started"),
1594        }
1595    }
1596
1597    /// Get the room with the given room id.
1598    ///
1599    /// # Arguments
1600    ///
1601    /// * `room_id` - The id of the room that should be fetched.
1602    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1603        self.state_store.room(room_id)
1604    }
1605
1606    /// Forget the room with the given room ID.
1607    ///
1608    /// The room will be dropped from the room list and the store.
1609    ///
1610    /// # Arguments
1611    ///
1612    /// * `room_id` - The id of the room that should be forgotten.
1613    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1614        // Forget the room in the state store.
1615        self.state_store.forget_room(room_id).await?;
1616
1617        // Remove the room in the event cache store too.
1618        self.event_cache_store().lock().await?.remove_room(room_id).await?;
1619
1620        Ok(())
1621    }
1622
1623    /// Get the olm machine.
1624    #[cfg(feature = "e2e-encryption")]
1625    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1626        self.olm_machine.read().await
1627    }
1628
1629    /// Get the push rules.
1630    ///
1631    /// Gets the push rules previously processed, otherwise get them from the
1632    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1633    /// is logged in.
1634    pub(crate) async fn get_push_rules(
1635        &self,
1636        account_data_processor: &AccountDataProcessor,
1637    ) -> Result<Ruleset> {
1638        if let Some(event) = account_data_processor
1639            .push_rules()
1640            .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1641        {
1642            Ok(event.content.global)
1643        } else if let Some(event) = self
1644            .state_store
1645            .get_account_data_event_static::<PushRulesEventContent>()
1646            .await?
1647            .and_then(|ev| ev.deserialize().ok())
1648        {
1649            Ok(event.content.global)
1650        } else if let Some(session_meta) = self.state_store.session_meta() {
1651            Ok(Ruleset::server_default(&session_meta.user_id))
1652        } else {
1653            Ok(Ruleset::new())
1654        }
1655    }
1656
1657    /// Get the push context for the given room.
1658    ///
1659    /// Tries to get the data from `changes` or the up to date `room_info`.
1660    /// Loads the data from the store otherwise.
1661    ///
1662    /// Returns `None` if some data couldn't be found. This should only happen
1663    /// in brand new rooms, while we process its state.
1664    pub async fn get_push_room_context(
1665        &self,
1666        room: &Room,
1667        room_info: &RoomInfo,
1668        changes: &StateChanges,
1669    ) -> Result<Option<PushConditionRoomCtx>> {
1670        let room_id = room.room_id();
1671        let user_id = room.own_user_id();
1672
1673        let member_count = room_info.active_members_count();
1674
1675        // TODO: Use if let chain once stable
1676        let user_display_name = if let Some(AnySyncStateEvent::RoomMember(member)) =
1677            changes.state.get(room_id).and_then(|events| {
1678                events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1679            }) {
1680            member
1681                .as_original()
1682                .and_then(|ev| ev.content.displayname.clone())
1683                .unwrap_or_else(|| user_id.localpart().to_owned())
1684        } else if let Some(AnyStrippedStateEvent::RoomMember(member)) =
1685            changes.stripped_state.get(room_id).and_then(|events| {
1686                events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1687            })
1688        {
1689            member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
1690        } else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
1691            member.name().to_owned()
1692        } else {
1693            trace!("Couldn't get push context because of missing own member information");
1694            return Ok(None);
1695        };
1696
1697        let power_levels = if let Some(event) = changes.state.get(room_id).and_then(|types| {
1698            types
1699                .get(&StateEventType::RoomPowerLevels)?
1700                .get("")?
1701                .deserialize_as::<RoomPowerLevelsEvent>()
1702                .ok()
1703        }) {
1704            Some(event.power_levels().into())
1705        } else if let Some(event) = changes.stripped_state.get(room_id).and_then(|types| {
1706            types
1707                .get(&StateEventType::RoomPowerLevels)?
1708                .get("")?
1709                .deserialize_as::<StrippedRoomPowerLevelsEvent>()
1710                .ok()
1711        }) {
1712            Some(event.power_levels().into())
1713        } else {
1714            self.state_store
1715                .get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
1716                .await?
1717                .and_then(|e| e.deserialize().ok())
1718                .map(|event| event.power_levels().into())
1719        };
1720
1721        Ok(Some(PushConditionRoomCtx {
1722            user_id: user_id.to_owned(),
1723            room_id: room_id.to_owned(),
1724            member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
1725            user_display_name,
1726            power_levels,
1727        }))
1728    }
1729
1730    /// Update the push context for the given room.
1731    ///
1732    /// Updates the context data from `changes` or `room_info`.
1733    pub fn update_push_room_context(
1734        &self,
1735        push_rules: &mut PushConditionRoomCtx,
1736        user_id: &UserId,
1737        room_info: &RoomInfo,
1738        changes: &StateChanges,
1739    ) {
1740        let room_id = &*room_info.room_id;
1741
1742        push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
1743
1744        // TODO: Use if let chain once stable
1745        if let Some(AnySyncStateEvent::RoomMember(member)) =
1746            changes.state.get(room_id).and_then(|events| {
1747                events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1748            })
1749        {
1750            push_rules.user_display_name = member
1751                .as_original()
1752                .and_then(|ev| ev.content.displayname.clone())
1753                .unwrap_or_else(|| user_id.localpart().to_owned())
1754        }
1755
1756        if let Some(AnySyncStateEvent::RoomPowerLevels(event)) =
1757            changes.state.get(room_id).and_then(|types| {
1758                types.get(&StateEventType::RoomPowerLevels)?.get("")?.deserialize().ok()
1759            })
1760        {
1761            push_rules.power_levels = Some(event.power_levels().into());
1762        }
1763    }
1764
1765    /// Returns a subscriber that publishes an event every time the ignore user
1766    /// list changes
1767    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1768        self.ignore_user_list_changes.subscribe()
1769    }
1770
1771    pub(crate) fn deserialize_state_events(
1772        raw_events: &[Raw<AnySyncStateEvent>],
1773    ) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
1774        raw_events
1775            .iter()
1776            .filter_map(|raw_event| match raw_event.deserialize() {
1777                Ok(event) => Some((raw_event.clone(), event)),
1778                Err(e) => {
1779                    warn!("Couldn't deserialize state event: {e}");
1780                    None
1781                }
1782            })
1783            .collect()
1784    }
1785
1786    pub(crate) fn deserialize_stripped_state_events(
1787        raw_events: &[Raw<AnyStrippedStateEvent>],
1788    ) -> Vec<(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)> {
1789        raw_events
1790            .iter()
1791            .filter_map(|raw_event| match raw_event.deserialize() {
1792                Ok(event) => Some((raw_event.clone(), event)),
1793                Err(e) => {
1794                    warn!("Couldn't deserialize stripped state event: {e}");
1795                    None
1796                }
1797            })
1798            .collect()
1799    }
1800
1801    /// Returns a new receiver that gets future room info notable updates.
1802    ///
1803    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1804    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1805        self.room_info_notable_update_sender.subscribe()
1806    }
1807}
1808
1809fn handle_room_member_event_for_profiles(
1810    room_id: &RoomId,
1811    event: &SyncStateEvent<RoomMemberEventContent>,
1812    changes: &mut StateChanges,
1813) {
1814    // Senders can fake the profile easily so we keep track of profiles that the
1815    // member set themselves to avoid having confusing profile changes when a
1816    // member gets kicked/banned.
1817    if event.state_key() == event.sender() {
1818        changes
1819            .profiles
1820            .entry(room_id.to_owned())
1821            .or_default()
1822            .insert(event.sender().to_owned(), event.into());
1823    }
1824
1825    if *event.membership() == MembershipState::Invite {
1826        // Remove any profile previously stored for the invited user.
1827        //
1828        // A room member could have joined the room and left it later; in that case, the
1829        // server may return a dummy, empty profile along the `leave` event. We
1830        // don't want to reuse that empty profile when the member has been
1831        // re-invited, so we remove it from the database.
1832        changes
1833            .profiles_to_delete
1834            .entry(room_id.to_owned())
1835            .or_default()
1836            .push(event.state_key().clone());
1837    }
1838}
1839
1840/// Represent the `required_state` values sent by a sync request.
1841///
1842/// This is useful to track what state events have been requested when handling
1843/// a response.
1844///
1845/// For example, if a sync requests the `m.room.encryption` state event, and the
1846/// server replies with nothing, if means the room **is not** encrypted. Without
1847/// knowing which state event was required by the sync, it is impossible to
1848/// interpret the absence of state event from the server as _the room's
1849/// encryption state is **not encrypted**_ or _the room's encryption state is
1850/// **unknown**_.
1851#[derive(Debug, Default)]
1852pub struct RequestedRequiredStates {
1853    default: Vec<(StateEventType, String)>,
1854    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1855}
1856
1857impl RequestedRequiredStates {
1858    /// Create a new `RequestedRequiredStates`.
1859    ///
1860    /// `default` represents the `required_state` value for all rooms.
1861    /// `for_rooms` is the `required_state` per room.
1862    pub fn new(
1863        default: Vec<(StateEventType, String)>,
1864        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1865    ) -> Self {
1866        Self { default, for_rooms }
1867    }
1868
1869    /// Get the `required_state` value for a specific room.
1870    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1871        self.for_rooms.get(room_id).unwrap_or(&self.default)
1872    }
1873}
1874
1875impl From<&v5::Request> for RequestedRequiredStates {
1876    fn from(request: &v5::Request) -> Self {
1877        // The following information is missing in the MSC4186 at the time of writing
1878        // (2025-03-12) but: the `required_state`s from all lists and from all room
1879        // subscriptions are combined by doing an union.
1880        //
1881        // Thus, we can do the same here, put the union in `default` and keep
1882        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1883        let mut default = BTreeSet::new();
1884
1885        for list in request.lists.values() {
1886            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1887        }
1888
1889        for room_subscription in request.room_subscriptions.values() {
1890            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1891        }
1892
1893        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1894    }
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899    use std::collections::HashMap;
1900
1901    use assert_matches2::assert_let;
1902    use futures_util::FutureExt as _;
1903    use matrix_sdk_test::{
1904        async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1905        LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1906    };
1907    use ruma::{
1908        api::client::{self as api, sync::sync_events::v5},
1909        event_id,
1910        events::{room::member::MembershipState, StateEventType},
1911        room_id,
1912        serde::Raw,
1913        user_id,
1914    };
1915    use serde_json::{json, value::to_raw_value};
1916
1917    use super::{BaseClient, RequestedRequiredStates};
1918    use crate::{
1919        store::{StateStoreExt, StoreConfig},
1920        test_utils::logged_in_base_client,
1921        RoomDisplayName, RoomState, SessionMeta,
1922    };
1923
1924    #[test]
1925    fn test_requested_required_states() {
1926        let room_id_0 = room_id!("!r0");
1927        let room_id_1 = room_id!("!r1");
1928
1929        let requested_required_states = RequestedRequiredStates::new(
1930            vec![(StateEventType::RoomAvatar, "".to_owned())],
1931            HashMap::from([(
1932                room_id_0.to_owned(),
1933                vec![
1934                    (StateEventType::RoomMember, "foo".to_owned()),
1935                    (StateEventType::RoomEncryption, "".to_owned()),
1936                ],
1937            )]),
1938        );
1939
1940        // A special set of state events exists for `room_id_0`.
1941        assert_eq!(
1942            requested_required_states.for_room(room_id_0),
1943            &[
1944                (StateEventType::RoomMember, "foo".to_owned()),
1945                (StateEventType::RoomEncryption, "".to_owned()),
1946            ]
1947        );
1948
1949        // No special list for `room_id_1`, it should return the defaults.
1950        assert_eq!(
1951            requested_required_states.for_room(room_id_1),
1952            &[(StateEventType::RoomAvatar, "".to_owned()),]
1953        );
1954    }
1955
1956    #[test]
1957    fn test_requested_required_states_from_sync_v5_request() {
1958        let room_id_0 = room_id!("!r0");
1959        let room_id_1 = room_id!("!r1");
1960
1961        // Empty request.
1962        let mut request = v5::Request::new();
1963
1964        {
1965            let requested_required_states = RequestedRequiredStates::from(&request);
1966
1967            assert!(requested_required_states.default.is_empty());
1968            assert!(requested_required_states.for_rooms.is_empty());
1969        }
1970
1971        // One list.
1972        request.lists.insert("foo".to_owned(), {
1973            let mut list = v5::request::List::default();
1974            list.room_details.required_state = vec![
1975                (StateEventType::RoomAvatar, "".to_owned()),
1976                (StateEventType::RoomEncryption, "".to_owned()),
1977            ];
1978
1979            list
1980        });
1981
1982        {
1983            let requested_required_states = RequestedRequiredStates::from(&request);
1984
1985            assert_eq!(
1986                requested_required_states.default,
1987                &[
1988                    (StateEventType::RoomAvatar, "".to_owned()),
1989                    (StateEventType::RoomEncryption, "".to_owned())
1990                ]
1991            );
1992            assert!(requested_required_states.for_rooms.is_empty());
1993        }
1994
1995        // Two lists.
1996        request.lists.insert("bar".to_owned(), {
1997            let mut list = v5::request::List::default();
1998            list.room_details.required_state = vec![
1999                (StateEventType::RoomEncryption, "".to_owned()),
2000                (StateEventType::RoomName, "".to_owned()),
2001            ];
2002
2003            list
2004        });
2005
2006        {
2007            let requested_required_states = RequestedRequiredStates::from(&request);
2008
2009            // Union of the state events.
2010            assert_eq!(
2011                requested_required_states.default,
2012                &[
2013                    (StateEventType::RoomAvatar, "".to_owned()),
2014                    (StateEventType::RoomEncryption, "".to_owned()),
2015                    (StateEventType::RoomName, "".to_owned()),
2016                ]
2017            );
2018            assert!(requested_required_states.for_rooms.is_empty());
2019        }
2020
2021        // One room subscription.
2022        request.room_subscriptions.insert(room_id_0.to_owned(), {
2023            let mut room_subscription = v5::request::RoomSubscription::default();
2024
2025            room_subscription.required_state = vec![
2026                (StateEventType::RoomJoinRules, "".to_owned()),
2027                (StateEventType::RoomEncryption, "".to_owned()),
2028            ];
2029
2030            room_subscription
2031        });
2032
2033        {
2034            let requested_required_states = RequestedRequiredStates::from(&request);
2035
2036            // Union of state events, all in `default`, still nothing in `for_rooms`.
2037            assert_eq!(
2038                requested_required_states.default,
2039                &[
2040                    (StateEventType::RoomAvatar, "".to_owned()),
2041                    (StateEventType::RoomEncryption, "".to_owned()),
2042                    (StateEventType::RoomJoinRules, "".to_owned()),
2043                    (StateEventType::RoomName, "".to_owned()),
2044                ]
2045            );
2046            assert!(requested_required_states.for_rooms.is_empty());
2047        }
2048
2049        // Two room subscriptions.
2050        request.room_subscriptions.insert(room_id_1.to_owned(), {
2051            let mut room_subscription = v5::request::RoomSubscription::default();
2052
2053            room_subscription.required_state = vec![
2054                (StateEventType::RoomName, "".to_owned()),
2055                (StateEventType::RoomTopic, "".to_owned()),
2056            ];
2057
2058            room_subscription
2059        });
2060
2061        {
2062            let requested_required_states = RequestedRequiredStates::from(&request);
2063
2064            // Union of state events, all in `default`, still nothing in `for_rooms`.
2065            assert_eq!(
2066                requested_required_states.default,
2067                &[
2068                    (StateEventType::RoomAvatar, "".to_owned()),
2069                    (StateEventType::RoomEncryption, "".to_owned()),
2070                    (StateEventType::RoomJoinRules, "".to_owned()),
2071                    (StateEventType::RoomName, "".to_owned()),
2072                    (StateEventType::RoomTopic, "".to_owned()),
2073                ]
2074            );
2075        }
2076    }
2077
2078    #[async_test]
2079    async fn test_invite_after_leaving() {
2080        let user_id = user_id!("@alice:example.org");
2081        let room_id = room_id!("!test:example.org");
2082
2083        let client = logged_in_base_client(Some(user_id)).await;
2084
2085        let mut sync_builder = SyncResponseBuilder::new();
2086
2087        let response = sync_builder
2088            .add_left_room(
2089                LeftRoomBuilder::new(room_id).add_timeline_event(
2090                    EventFactory::new()
2091                        .member(user_id)
2092                        .membership(MembershipState::Leave)
2093                        .display_name("Alice")
2094                        .event_id(event_id!("$994173582443PhrSn:example.org")),
2095                ),
2096            )
2097            .build_sync_response();
2098        client.receive_sync_response(response).await.unwrap();
2099        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
2100
2101        let response = sync_builder
2102            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
2103                StrippedStateTestEvent::Custom(json!({
2104                    "content": {
2105                        "displayname": "Alice",
2106                        "membership": "invite",
2107                    },
2108                    "event_id": "$143273582443PhrSn:example.org",
2109                    "origin_server_ts": 1432735824653u64,
2110                    "sender": "@example:example.org",
2111                    "state_key": user_id,
2112                    "type": "m.room.member",
2113                })),
2114            ))
2115            .build_sync_response();
2116        client.receive_sync_response(response).await.unwrap();
2117        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
2118    }
2119
2120    #[async_test]
2121    async fn test_invite_displayname() {
2122        let user_id = user_id!("@alice:example.org");
2123        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2124
2125        let client = logged_in_base_client(Some(user_id)).await;
2126
2127        let response = ruma_response_from_json(&json!({
2128            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
2129            "device_one_time_keys_count": {
2130                "signed_curve25519": 50u64
2131            },
2132            "device_unused_fallback_key_types": [
2133                "signed_curve25519"
2134            ],
2135            "rooms": {
2136                "invite": {
2137                    "!ithpyNKDtmhneaTQja:example.org": {
2138                        "invite_state": {
2139                            "events": [
2140                                {
2141                                    "content": {
2142                                        "creator": "@test:example.org",
2143                                        "room_version": "9"
2144                                    },
2145                                    "sender": "@test:example.org",
2146                                    "state_key": "",
2147                                    "type": "m.room.create"
2148                                },
2149                                {
2150                                    "content": {
2151                                        "join_rule": "invite"
2152                                    },
2153                                    "sender": "@test:example.org",
2154                                    "state_key": "",
2155                                    "type": "m.room.join_rules"
2156                                },
2157                                {
2158                                    "content": {
2159                                        "algorithm": "m.megolm.v1.aes-sha2"
2160                                    },
2161                                    "sender": "@test:example.org",
2162                                    "state_key": "",
2163                                    "type": "m.room.encryption"
2164                                },
2165                                {
2166                                    "content": {
2167                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
2168                                        "displayname": "Kyra",
2169                                        "membership": "join"
2170                                    },
2171                                    "sender": "@test:example.org",
2172                                    "state_key": "@test:example.org",
2173                                    "type": "m.room.member"
2174                                },
2175                                {
2176                                    "content": {
2177                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
2178                                        "displayname": "alice",
2179                                        "is_direct": true,
2180                                        "membership": "invite"
2181                                    },
2182                                    "origin_server_ts": 1650878657984u64,
2183                                    "sender": "@test:example.org",
2184                                    "state_key": "@alice:example.org",
2185                                    "type": "m.room.member",
2186                                    "unsigned": {
2187                                        "age": 14u64
2188                                    },
2189                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
2190                                }
2191                            ]
2192                        }
2193                    }
2194                }
2195            }
2196        }));
2197
2198        client.receive_sync_response(response).await.unwrap();
2199
2200        let room = client.get_room(room_id).expect("Room not found");
2201        assert_eq!(room.state(), RoomState::Invited);
2202        assert_eq!(
2203            room.compute_display_name().await.expect("fetching display name failed"),
2204            RoomDisplayName::Calculated("Kyra".to_owned())
2205        );
2206    }
2207
2208    #[cfg(feature = "e2e-encryption")]
2209    #[async_test]
2210    async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
2211        use std::collections::BTreeMap;
2212
2213        use matrix_sdk_test::event_factory::EventFactory;
2214        use ruma::{event_id, events::room::member::MembershipState};
2215
2216        use crate::{rooms::normal::RoomInfoNotableUpdateReasons, StateChanges};
2217
2218        // Given a room
2219        let user_id = user_id!("@u:u.to");
2220        let room_id = room_id!("!r:u.to");
2221        let client = logged_in_base_client(Some(user_id)).await;
2222
2223        let mut sync_builder = SyncResponseBuilder::new();
2224
2225        let response = sync_builder
2226            .add_joined_room(
2227                matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_timeline_event(
2228                    EventFactory::new()
2229                        .member(user_id)
2230                        .display_name("Alice")
2231                        .membership(MembershipState::Join)
2232                        .event_id(event_id!("$1")),
2233                ),
2234            )
2235            .build_sync_response();
2236        client.receive_sync_response(response).await.unwrap();
2237
2238        let room = client.get_room(room_id).expect("Just-created room not found!");
2239
2240        // Sanity: it has no latest_encrypted_events or latest_event
2241        assert!(room.latest_encrypted_events().is_empty());
2242        assert!(room.latest_event().is_none());
2243
2244        // When I tell it to do some decryption
2245        let mut changes = StateChanges::default();
2246        let mut room_info_notable_updates = BTreeMap::new();
2247        client.decrypt_latest_events(&room, &mut changes, &mut room_info_notable_updates).await;
2248
2249        // Then nothing changed
2250        assert!(room.latest_encrypted_events().is_empty());
2251        assert!(room.latest_event().is_none());
2252        assert!(changes.room_infos.is_empty());
2253        assert!(!room_info_notable_updates
2254            .get(room_id)
2255            .copied()
2256            .unwrap_or_default()
2257            .contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
2258    }
2259
2260    #[async_test]
2261    async fn test_deserialization_failure() {
2262        let user_id = user_id!("@alice:example.org");
2263        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2264
2265        let client =
2266            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2267        client
2268            .activate(
2269                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2270                #[cfg(feature = "e2e-encryption")]
2271                None,
2272            )
2273            .await
2274            .unwrap();
2275
2276        let response = ruma_response_from_json(&json!({
2277            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
2278            "rooms": {
2279                "join": {
2280                    "!ithpyNKDtmhneaTQja:example.org": {
2281                        "state": {
2282                            "events": [
2283                                {
2284                                    "invalid": "invalid",
2285                                },
2286                                {
2287                                    "content": {
2288                                        "name": "The room name"
2289                                    },
2290                                    "event_id": "$143273582443PhrSn:example.org",
2291                                    "origin_server_ts": 1432735824653u64,
2292                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
2293                                    "sender": "@example:example.org",
2294                                    "state_key": "",
2295                                    "type": "m.room.name",
2296                                    "unsigned": {
2297                                        "age": 1234
2298                                    }
2299                                },
2300                            ]
2301                        }
2302                    }
2303                }
2304            }
2305        }));
2306
2307        client.receive_sync_response(response).await.unwrap();
2308        client
2309            .state_store()
2310            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
2311            .await
2312            .expect("Failed to fetch state event")
2313            .expect("State event not found")
2314            .deserialize()
2315            .expect("Failed to deserialize state event");
2316    }
2317
2318    #[async_test]
2319    async fn test_invited_members_arent_ignored() {
2320        let user_id = user_id!("@alice:example.org");
2321        let inviter_user_id = user_id!("@bob:example.org");
2322        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2323
2324        let client =
2325            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2326        client
2327            .activate(
2328                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2329                #[cfg(feature = "e2e-encryption")]
2330                None,
2331            )
2332            .await
2333            .unwrap();
2334
2335        // Preamble: let the SDK know about the room.
2336        let mut sync_builder = SyncResponseBuilder::new();
2337        let response = sync_builder
2338            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
2339            .build_sync_response();
2340        client.receive_sync_response(response).await.unwrap();
2341
2342        // When I process the result of a /members request that only contains an invited
2343        // member,
2344        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2345
2346        let raw_member_event = json!({
2347            "content": {
2348                "avatar_url": "mxc://localhost/fewjilfewjil42",
2349                "displayname": "Invited Alice",
2350                "membership": "invite"
2351            },
2352            "event_id": "$151800140517rfvjc:localhost",
2353            "origin_server_ts": 151800140,
2354            "room_id": room_id,
2355            "sender": inviter_user_id,
2356            "state_key": user_id,
2357            "type": "m.room.member",
2358            "unsigned": {
2359                "age": 13374242,
2360            }
2361        });
2362        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2363            to_raw_value(&raw_member_event).unwrap(),
2364        )]);
2365
2366        // It's correctly processed,
2367        client.receive_all_members(room_id, &request, &response).await.unwrap();
2368
2369        let room = client.get_room(room_id).unwrap();
2370
2371        // And I can get the invited member display name and avatar.
2372        let member = room.get_member(user_id).await.expect("ok").expect("exists");
2373
2374        assert_eq!(member.user_id(), user_id);
2375        assert_eq!(member.display_name().unwrap(), "Invited Alice");
2376        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2377    }
2378
2379    #[async_test]
2380    async fn test_reinvited_members_get_a_display_name() {
2381        let user_id = user_id!("@alice:example.org");
2382        let inviter_user_id = user_id!("@bob:example.org");
2383        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2384
2385        let client =
2386            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2387        client
2388            .activate(
2389                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2390                #[cfg(feature = "e2e-encryption")]
2391                None,
2392            )
2393            .await
2394            .unwrap();
2395
2396        // Preamble: let the SDK know about the room, and that the invited user left it.
2397        let mut sync_builder = SyncResponseBuilder::new();
2398        let response = sync_builder
2399            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
2400                StateTestEvent::Custom(json!({
2401                    "content": {
2402                        "avatar_url": null,
2403                        "displayname": null,
2404                        "membership": "leave"
2405                    },
2406                    "event_id": "$151803140217rkvjc:localhost",
2407                    "origin_server_ts": 151800139,
2408                    "room_id": room_id,
2409                    "sender": user_id,
2410                    "state_key": user_id,
2411                    "type": "m.room.member",
2412                })),
2413            ))
2414            .build_sync_response();
2415        client.receive_sync_response(response).await.unwrap();
2416
2417        // Now, say that the user has been re-invited.
2418        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2419
2420        let raw_member_event = json!({
2421            "content": {
2422                "avatar_url": "mxc://localhost/fewjilfewjil42",
2423                "displayname": "Invited Alice",
2424                "membership": "invite"
2425            },
2426            "event_id": "$151800140517rfvjc:localhost",
2427            "origin_server_ts": 151800140,
2428            "room_id": room_id,
2429            "sender": inviter_user_id,
2430            "state_key": user_id,
2431            "type": "m.room.member",
2432            "unsigned": {
2433                "age": 13374242,
2434            }
2435        });
2436        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2437            to_raw_value(&raw_member_event).unwrap(),
2438        )]);
2439
2440        // It's correctly processed,
2441        client.receive_all_members(room_id, &request, &response).await.unwrap();
2442
2443        let room = client.get_room(room_id).unwrap();
2444
2445        // And I can get the invited member display name and avatar.
2446        let member = room.get_member(user_id).await.expect("ok").expect("exists");
2447
2448        assert_eq!(member.user_id(), user_id);
2449        assert_eq!(member.display_name().unwrap(), "Invited Alice");
2450        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2451    }
2452
2453    #[async_test]
2454    async fn test_ignored_user_list_changes() {
2455        let user_id = user_id!("@alice:example.org");
2456        let client =
2457            BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
2458        client
2459            .activate(
2460                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2461                #[cfg(feature = "e2e-encryption")]
2462                None,
2463            )
2464            .await
2465            .unwrap();
2466
2467        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
2468        assert!(subscriber.next().now_or_never().is_none());
2469
2470        let mut sync_builder = SyncResponseBuilder::new();
2471        let response = sync_builder
2472            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2473                json!({
2474                    "content": {
2475                        "ignored_users": {
2476                            *BOB: {}
2477                        }
2478                    },
2479                    "type": "m.ignored_user_list",
2480                }),
2481            ))
2482            .build_sync_response();
2483        client.receive_sync_response(response).await.unwrap();
2484
2485        assert_let!(Some(ignored) = subscriber.next().await);
2486        assert_eq!(ignored, [BOB.to_string()]);
2487
2488        // Receive the same response.
2489        let response = sync_builder
2490            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2491                json!({
2492                    "content": {
2493                        "ignored_users": {
2494                            *BOB: {}
2495                        }
2496                    },
2497                    "type": "m.ignored_user_list",
2498                }),
2499            ))
2500            .build_sync_response();
2501        client.receive_sync_response(response).await.unwrap();
2502
2503        // No changes in the ignored list.
2504        assert!(subscriber.next().now_or_never().is_none());
2505
2506        // Now remove Bob from the ignored list.
2507        let response = sync_builder
2508            .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
2509                json!({
2510                    "content": {
2511                        "ignored_users": {}
2512                    },
2513                    "type": "m.ignored_user_list",
2514                }),
2515            ))
2516            .build_sync_response();
2517        client.receive_sync_response(response).await.unwrap();
2518
2519        assert_let!(Some(ignored) = subscriber.next().await);
2520        assert!(ignored.is_empty());
2521    }
2522}