matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        SentRequestKey,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
68
69/// Identifier of the latest database version.
70///
71/// This is used to figure whether the sqlite database requires a migration.
72/// Every new SQL migration should imply a bump of this number, and changes in
73/// the [`SqliteStateStore::run_migrations`] function..
74const DATABASE_VERSION: u8 = 12;
75
76/// A sqlite based cryptostore.
77#[derive(Clone)]
78pub struct SqliteStateStore {
79    store_cipher: Option<Arc<StoreCipher>>,
80    pool: SqlitePool,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteStateStore {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
87    }
88}
89
90impl SqliteStateStore {
91    /// Open the sqlite-based state store at the given path using the given
92    /// passphrase to encrypt private data.
93    pub async fn open(
94        path: impl AsRef<Path>,
95        passphrase: Option<&str>,
96    ) -> Result<Self, OpenStoreError> {
97        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
98    }
99
100    /// Open the sqlite-based state store with the config open config.
101    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
102        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
103
104        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
105
106        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
107        config.pool = Some(pool_config);
108
109        let pool = config.create_pool(Runtime::Tokio1)?;
110
111        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
112        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
113
114        Ok(this)
115    }
116
117    /// Create a sqlite-based state store using the given sqlite database pool.
118    /// The given passphrase will be used to encrypt private data.
119    async fn open_with_pool(
120        pool: SqlitePool,
121        passphrase: Option<&str>,
122    ) -> Result<Self, OpenStoreError> {
123        let conn = pool.get().await?;
124
125        let mut version = conn.db_version().await?;
126
127        if version == 0 {
128            init(&conn).await?;
129            version = 1;
130        }
131
132        let store_cipher = match passphrase {
133            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
134            None => None,
135        };
136        let this = Self { store_cipher, pool };
137        this.run_migrations(&conn, version, None).await?;
138
139        Ok(this)
140    }
141
142    /// Run database migrations from the given `from` version to the given `to`
143    /// version
144    ///
145    /// If `to` is `None`, the current database version will be used.
146    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
147        let to = to.unwrap_or(DATABASE_VERSION);
148
149        if from < to {
150            debug!(version = from, new_version = to, "Upgrading database");
151        } else {
152            return Ok(());
153        }
154
155        if from < 2 && to >= 2 {
156            let this = self.clone();
157            conn.with_transaction(move |txn| {
158                // Create new table.
159                txn.execute_batch(include_str!(
160                    "../migrations/state_store/002_a_create_new_room_info.sql"
161                ))?;
162
163                // Migrate data to new table.
164                for data in txn
165                    .prepare("SELECT data FROM room_info")?
166                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
167                {
168                    let data = data?;
169                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
170
171                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
172                    let state = this
173                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
174                    txn.prepare_cached(
175                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
176                         VALUES (?, ?, ?)",
177                    )?
178                    .execute((room_id, state, data))?;
179                }
180
181                // Replace old table.
182                txn.execute_batch(include_str!(
183                    "../migrations/state_store/002_b_replace_room_info.sql"
184                ))?;
185
186                txn.set_db_version(2)?;
187                Result::<_, Error>::Ok(())
188            })
189            .await?;
190        }
191
192        // Migration to v3: RoomInfo format has changed.
193        if from < 3 && to >= 3 {
194            let this = self.clone();
195            conn.with_transaction(move |txn| {
196                // Migrate data .
197                for data in txn
198                    .prepare("SELECT data FROM room_info")?
199                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
200                {
201                    let data = data?;
202                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
203
204                    // Get the `m.room.create` event from the room state.
205                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
206                    let event_type =
207                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
208                    let create_res = txn
209                        .prepare(
210                            "SELECT stripped, data FROM state_event
211                             WHERE room_id = ? AND event_type = ?",
212                        )?
213                        .query_row([room_id, event_type], |row| {
214                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
215                        })
216                        .optional()?;
217
218                    let create = create_res.and_then(|(stripped, data)| {
219                        let create = if stripped {
220                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
221                                this.deserialize_json(&data).ok()?,
222                            )
223                        } else {
224                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
225                        };
226                        Some(create)
227                    });
228
229                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
230
231                    let data = this.serialize_json(&migrated_room_info)?;
232                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
233                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
234                        .execute((data, room_id))?;
235                }
236
237                txn.set_db_version(3)?;
238                Result::<_, Error>::Ok(())
239            })
240            .await?;
241        }
242
243        if from < 4 && to >= 4 {
244            conn.with_transaction(move |txn| {
245                // Create new table.
246                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
247                txn.set_db_version(4)
248            })
249            .await?;
250        }
251
252        if from < 5 && to >= 5 {
253            conn.with_transaction(move |txn| {
254                // Create new table.
255                txn.execute_batch(include_str!(
256                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
257                ))?;
258                txn.set_db_version(4)
259            })
260            .await?;
261        }
262
263        if from < 6 && to >= 6 {
264            conn.with_transaction(move |txn| {
265                // Create new table.
266                txn.execute_batch(include_str!(
267                    "../migrations/state_store/005_send_queue_dependent_events.sql"
268                ))?;
269                txn.set_db_version(6)
270            })
271            .await?;
272        }
273
274        if from < 7 && to >= 7 {
275            conn.with_transaction(move |txn| {
276                // Drop media table.
277                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
278                txn.set_db_version(7)
279            })
280            .await?;
281        }
282
283        if from < 8 && to >= 8 {
284            // Replace all existing wedged events with a generic error.
285            let error = QueueWedgeError::GenericApiError {
286                msg: "local echo failed to send in a previous session".into(),
287            };
288            let default_err = self.serialize_value(&error)?;
289
290            conn.with_transaction(move |txn| {
291                // Update send queue table to persist the wedge reason if any.
292                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
293
294                // Migrate the data, add a generic error for currently wedged events
295
296                for wedged_entries in txn
297                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
298                    .query_map((), |row| {
299                        Ok(
300                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
301                        )
302                    })? {
303
304                    let (room_id, transaction_id) = wedged_entries?;
305
306                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
307                        .execute((default_err.clone(), room_id, transaction_id))?;
308                }
309
310
311                // Clean up the table now that data is migrated
312                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
313
314                txn.set_db_version(8)
315            })
316                .await?;
317        }
318
319        if from < 9 && to >= 9 {
320            conn.with_transaction(move |txn| {
321                // Run the migration.
322                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
323                txn.set_db_version(9)
324            })
325            .await?;
326        }
327
328        if from < 10 && to >= 10 {
329            conn.with_transaction(move |txn| {
330                // Run the migration.
331                txn.execute_batch(include_str!(
332                    "../migrations/state_store/009_send_queue_priority.sql"
333                ))?;
334                txn.set_db_version(10)
335            })
336            .await?;
337        }
338
339        if from < 11 && to >= 11 {
340            conn.with_transaction(move |txn| {
341                // Run the migration.
342                txn.execute_batch(include_str!(
343                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
344                ))?;
345                txn.set_db_version(11)
346            })
347            .await?;
348        }
349
350        if from < 12 && to >= 12 {
351            // Defragment the DB and optimize its size on the filesystem.
352            // This should have been run in the migration for version 7, to reduce the size
353            // of the DB as we removed the media cache.
354            conn.vacuum().await?;
355            conn.set_kv("version", vec![12]).await?;
356        }
357
358        Ok(())
359    }
360
361    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
362        if let Some(key) = &self.store_cipher {
363            let encrypted = key.encrypt_value_data(value)?;
364            Ok(rmp_serde::to_vec_named(&encrypted)?)
365        } else {
366            Ok(value)
367        }
368    }
369
370    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
371        let serialized = rmp_serde::to_vec_named(value)?;
372        self.encode_value(serialized)
373    }
374
375    fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
376        let serialized = serde_json::to_vec(value)?;
377        self.encode_value(serialized)
378    }
379
380    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
381        if let Some(key) = &self.store_cipher {
382            let encrypted = rmp_serde::from_slice(value)?;
383            let decrypted = key.decrypt_value_data(encrypted)?;
384            Ok(Cow::Owned(decrypted))
385        } else {
386            Ok(Cow::Borrowed(value))
387        }
388    }
389
390    fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
391        let decoded = self.decode_value(data)?;
392        Ok(serde_json::from_slice(&decoded)?)
393    }
394
395    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
396        let decoded = self.decode_value(value)?;
397        Ok(rmp_serde::from_slice(&decoded)?)
398    }
399
400    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
401        let bytes = key.as_ref();
402        if let Some(store_cipher) = &self.store_cipher {
403            Key::Hashed(store_cipher.hash_key(table_name, bytes))
404        } else {
405            Key::Plain(bytes.to_owned())
406        }
407    }
408
409    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410        let key_s = match key {
411            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412            StateStoreDataKey::ServerCapabilities => {
413                Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
414            }
415            StateStoreDataKey::Filter(f) => {
416                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
417            }
418            StateStoreDataKey::UserAvatarUrl(u) => {
419                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
420            }
421            StateStoreDataKey::RecentlyVisitedRooms(b) => {
422                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
423            }
424            StateStoreDataKey::UtdHookManagerData => {
425                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
426            }
427            StateStoreDataKey::ComposerDraft(room_id) => {
428                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
429            }
430            StateStoreDataKey::SeenKnockRequests(room_id) => {
431                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
432            }
433        };
434
435        self.encode_key(keys::KV_BLOB, &*key_s)
436    }
437
438    fn encode_presence_key(&self, user_id: &UserId) -> Key {
439        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
440    }
441
442    fn encode_custom_key(&self, key: &[u8]) -> Key {
443        let mut full_key = b"custom:".to_vec();
444        full_key.extend(key);
445        self.encode_key(keys::KV_BLOB, full_key)
446    }
447
448    async fn acquire(&self) -> Result<SqliteAsyncConn> {
449        Ok(self.pool.get().await?)
450    }
451
452    fn remove_maybe_stripped_room_data(
453        &self,
454        txn: &Transaction<'_>,
455        room_id: &RoomId,
456        stripped: bool,
457    ) -> rusqlite::Result<()> {
458        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
459        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
460
461        let member_room_id = self.encode_key(keys::MEMBER, room_id);
462        txn.remove_room_members(&member_room_id, Some(stripped))
463    }
464}
465
466/// Initialize the database.
467async fn init(conn: &SqliteAsyncConn) -> Result<()> {
468    // First turn on WAL mode, this can't be done in the transaction, it fails with
469    // the error message: "cannot change into wal mode from within a transaction".
470    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
471    conn.with_transaction(|txn| {
472        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
473        txn.set_db_version(1)?;
474
475        Ok(())
476    })
477    .await
478}
479
480trait SqliteConnectionStateStoreExt {
481    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
482
483    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
484
485    fn set_room_account_data(
486        &self,
487        room_id: &[u8],
488        event_type: &[u8],
489        data: &[u8],
490    ) -> rusqlite::Result<()>;
491    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
492
493    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
494    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
495    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
496
497    fn set_state_event(
498        &self,
499        room_id: &[u8],
500        event_type: &[u8],
501        state_key: &[u8],
502        stripped: bool,
503        event_id: Option<&[u8]>,
504        data: &[u8],
505    ) -> rusqlite::Result<()>;
506    fn get_state_event_by_id(
507        &self,
508        room_id: &[u8],
509        event_id: &[u8],
510    ) -> rusqlite::Result<Option<Vec<u8>>>;
511    fn remove_room_state_events(
512        &self,
513        room_id: &[u8],
514        stripped: Option<bool>,
515    ) -> rusqlite::Result<()>;
516
517    fn set_member(
518        &self,
519        room_id: &[u8],
520        user_id: &[u8],
521        membership: &[u8],
522        stripped: bool,
523        data: &[u8],
524    ) -> rusqlite::Result<()>;
525    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
526
527    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
528    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
529    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
530
531    fn set_receipt(
532        &self,
533        room_id: &[u8],
534        user_id: &[u8],
535        receipt_type: &[u8],
536        thread_id: &[u8],
537        event_id: &[u8],
538        data: &[u8],
539    ) -> rusqlite::Result<()>;
540    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
544    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547}
548
549impl SqliteConnectionStateStoreExt for rusqlite::Connection {
550    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
551        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
552        Ok(())
553    }
554
555    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
556        self.prepare_cached(
557            "INSERT OR REPLACE INTO global_account_data (event_type, data)
558             VALUES (?, ?)",
559        )?
560        .execute((event_type, data))?;
561        Ok(())
562    }
563
564    fn set_room_account_data(
565        &self,
566        room_id: &[u8],
567        event_type: &[u8],
568        data: &[u8],
569    ) -> rusqlite::Result<()> {
570        self.prepare_cached(
571            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
572             VALUES (?, ?, ?)",
573        )?
574        .execute((room_id, event_type, data))?;
575        Ok(())
576    }
577
578    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579        self.prepare(
580            "DELETE FROM room_account_data
581             WHERE room_id = ?",
582        )?
583        .execute((room_id,))?;
584        Ok(())
585    }
586
587    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
588        self.prepare_cached(
589            "INSERT OR REPLACE INTO room_info (room_id, state, data)
590             VALUES (?, ?, ?)",
591        )?
592        .execute((room_id, state, data))?;
593        Ok(())
594    }
595
596    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
597        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
598            .optional()
599    }
600
601    /// Remove the room info for the given room.
602    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
603        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
604        Ok(())
605    }
606
607    fn set_state_event(
608        &self,
609        room_id: &[u8],
610        event_type: &[u8],
611        state_key: &[u8],
612        stripped: bool,
613        event_id: Option<&[u8]>,
614        data: &[u8],
615    ) -> rusqlite::Result<()> {
616        self.prepare_cached(
617            "INSERT OR REPLACE
618             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
619             VALUES (?, ?, ?, ?, ?, ?)",
620        )?
621        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
622        Ok(())
623    }
624
625    fn get_state_event_by_id(
626        &self,
627        room_id: &[u8],
628        event_id: &[u8],
629    ) -> rusqlite::Result<Option<Vec<u8>>> {
630        self.query_row(
631            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
632            (room_id, event_id),
633            |row| row.get(0),
634        )
635        .optional()
636    }
637
638    /// Remove state events for the given room.
639    ///
640    /// If `stripped` is `Some()`, only removes state events for the given
641    /// stripped state. Otherwise, state events are removed regardless of the
642    /// stripped state.
643    fn remove_room_state_events(
644        &self,
645        room_id: &[u8],
646        stripped: Option<bool>,
647    ) -> rusqlite::Result<()> {
648        if let Some(stripped) = stripped {
649            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
650                .execute((room_id, stripped))?;
651        } else {
652            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
653                .execute((room_id,))?;
654        }
655        Ok(())
656    }
657
658    fn set_member(
659        &self,
660        room_id: &[u8],
661        user_id: &[u8],
662        membership: &[u8],
663        stripped: bool,
664        data: &[u8],
665    ) -> rusqlite::Result<()> {
666        self.prepare_cached(
667            "INSERT OR REPLACE
668             INTO member (room_id, user_id, membership, stripped, data)
669             VALUES (?, ?, ?, ?, ?)",
670        )?
671        .execute((room_id, user_id, membership, stripped, data))?;
672        Ok(())
673    }
674
675    /// Remove members for the given room.
676    ///
677    /// If `stripped` is `Some()`, only removes members for the given stripped
678    /// state. Otherwise, members are removed regardless of the stripped state.
679    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
680        if let Some(stripped) = stripped {
681            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
682                .execute((room_id, stripped))?;
683        } else {
684            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
685        }
686        Ok(())
687    }
688
689    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
690        self.prepare_cached(
691            "INSERT OR REPLACE
692             INTO profile (room_id, user_id, data)
693             VALUES (?, ?, ?)",
694        )?
695        .execute((room_id, user_id, data))?;
696        Ok(())
697    }
698
699    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
700        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
701        Ok(())
702    }
703
704    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
705        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
706            .execute((room_id, user_id))?;
707        Ok(())
708    }
709
710    fn set_receipt(
711        &self,
712        room_id: &[u8],
713        user_id: &[u8],
714        receipt_type: &[u8],
715        thread: &[u8],
716        event_id: &[u8],
717        data: &[u8],
718    ) -> rusqlite::Result<()> {
719        self.prepare_cached(
720            "INSERT OR REPLACE
721             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
722             VALUES (?, ?, ?, ?, ?, ?)",
723        )?
724        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
725        Ok(())
726    }
727
728    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
730        Ok(())
731    }
732
733    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
734        self.prepare_cached(
735            "INSERT OR REPLACE
736             INTO display_name (room_id, name, data)
737             VALUES (?, ?, ?)",
738        )?
739        .execute((room_id, name, data))?;
740        Ok(())
741    }
742
743    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
744        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
745            .execute((room_id, name))?;
746        Ok(())
747    }
748
749    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
751        Ok(())
752    }
753
754    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
756        Ok(())
757    }
758
759    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
761            .execute((room_id,))?;
762        Ok(())
763    }
764}
765
766#[async_trait]
767trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
768    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
769        Ok(self
770            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
771            .await
772            .optional()?)
773    }
774
775    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
776        let keys_length = keys.len();
777
778        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
779            let sql_params = repeat_vars(keys.len());
780            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
781
782            let params = rusqlite::params_from_iter(keys);
783
784            Ok(txn
785                .prepare(&sql)?
786                .query(params)?
787                .mapped(|row| row.get(0))
788                .collect::<Result<_, _>>()?)
789        })
790        .await
791    }
792
793    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
794
795    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
796        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
797        Ok(())
798    }
799
800    async fn get_room_infos(&self) -> Result<Vec<Vec<u8>>> {
801        Ok(self
802            .prepare("SELECT data FROM room_info", move |mut stmt| {
803                stmt.query_map((), |row| row.get(0))?.collect()
804            })
805            .await?)
806    }
807
808    async fn get_maybe_stripped_state_events_for_keys(
809        &self,
810        room_id: Key,
811        event_type: Key,
812        state_keys: Vec<Key>,
813    ) -> Result<Vec<(bool, Vec<u8>)>> {
814        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
815            let sql_params = repeat_vars(state_keys.len());
816            let sql = format!(
817                "SELECT stripped, data FROM state_event
818                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
819            );
820
821            let params = rusqlite::params_from_iter(
822                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
823            );
824
825            Ok(txn
826                .prepare(&sql)?
827                .query(params)?
828                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
829                .collect::<Result<_, _>>()?)
830        })
831        .await
832    }
833
834    async fn get_maybe_stripped_state_events(
835        &self,
836        room_id: Key,
837        event_type: Key,
838    ) -> Result<Vec<(bool, Vec<u8>)>> {
839        Ok(self
840            .prepare(
841                "SELECT stripped, data FROM state_event
842                 WHERE room_id = ? AND event_type = ?",
843                |mut stmt| {
844                    stmt.query((room_id, event_type))?
845                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
846                        .collect()
847                },
848            )
849            .await?)
850    }
851
852    async fn get_profiles(
853        &self,
854        room_id: Key,
855        user_ids: Vec<Key>,
856    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
857        let user_ids_length = user_ids.len();
858
859        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
860            let sql_params = repeat_vars(user_ids.len());
861            let sql = format!(
862                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
863            );
864
865            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
866
867            Ok(txn
868                .prepare(&sql)?
869                .query(params)?
870                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
871                .collect::<Result<_, _>>()?)
872        })
873        .await
874    }
875
876    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
877        let res = if memberships.is_empty() {
878            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
879                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
880            })
881            .await?
882        } else {
883            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
884                let sql_params = repeat_vars(memberships.len());
885                let sql = format!(
886                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
887                );
888
889                let params =
890                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
891
892                Ok(txn
893                    .prepare(&sql)?
894                    .query(params)?
895                    .mapped(|row| row.get(0))
896                    .collect::<Result<_, _>>()?)
897            })
898            .await?
899        };
900
901        Ok(res)
902    }
903
904    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
905        Ok(self
906            .query_row(
907                "SELECT data FROM global_account_data WHERE event_type = ?",
908                (event_type,),
909                |row| row.get(0),
910            )
911            .await
912            .optional()?)
913    }
914
915    async fn get_room_account_data(
916        &self,
917        room_id: Key,
918        event_type: Key,
919    ) -> Result<Option<Vec<u8>>> {
920        Ok(self
921            .query_row(
922                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
923                (room_id, event_type),
924                |row| row.get(0),
925            )
926            .await
927            .optional()?)
928    }
929
930    async fn get_display_names(
931        &self,
932        room_id: Key,
933        names: Vec<Key>,
934    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
935        let names_length = names.len();
936
937        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
938            let sql_params = repeat_vars(names.len());
939            let sql = format!(
940                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
941            );
942
943            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
944
945            Ok(txn
946                .prepare(&sql)?
947                .query(params)?
948                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
949                .collect::<Result<_, _>>()?)
950        })
951        .await
952    }
953
954    async fn get_user_receipt(
955        &self,
956        room_id: Key,
957        receipt_type: Key,
958        thread: Key,
959        user_id: Key,
960    ) -> Result<Option<Vec<u8>>> {
961        Ok(self
962            .query_row(
963                "SELECT data FROM receipt
964                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
965                (room_id, receipt_type, thread, user_id),
966                |row| row.get(0),
967            )
968            .await
969            .optional()?)
970    }
971
972    async fn get_event_receipts(
973        &self,
974        room_id: Key,
975        receipt_type: Key,
976        thread: Key,
977        event_id: Key,
978    ) -> Result<Vec<Vec<u8>>> {
979        Ok(self
980            .prepare(
981                "SELECT data FROM receipt
982                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
983                |mut stmt| {
984                    stmt.query((room_id, receipt_type, thread, event_id))?
985                        .mapped(|row| row.get(0))
986                        .collect()
987                },
988            )
989            .await?)
990    }
991}
992
993#[async_trait]
994impl SqliteObjectStateStoreExt for SqliteAsyncConn {
995    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
996        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
997    }
998}
999
1000#[async_trait]
1001impl StateStore for SqliteStateStore {
1002    type Error = Error;
1003
1004    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1005        self.acquire()
1006            .await?
1007            .get_kv_blob(self.encode_state_store_data_key(key))
1008            .await?
1009            .map(|data| {
1010                Ok(match key {
1011                    StateStoreDataKey::SyncToken => {
1012                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1013                    }
1014                    StateStoreDataKey::ServerCapabilities => {
1015                        StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1016                    }
1017                    StateStoreDataKey::Filter(_) => {
1018                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1019                    }
1020                    StateStoreDataKey::UserAvatarUrl(_) => {
1021                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1022                    }
1023                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1024                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1025                    }
1026                    StateStoreDataKey::UtdHookManagerData => {
1027                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1028                    }
1029                    StateStoreDataKey::ComposerDraft(_) => {
1030                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1031                    }
1032                    StateStoreDataKey::SeenKnockRequests(_) => {
1033                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1034                    }
1035                })
1036            })
1037            .transpose()
1038    }
1039
1040    async fn set_kv_data(
1041        &self,
1042        key: StateStoreDataKey<'_>,
1043        value: StateStoreDataValue,
1044    ) -> Result<()> {
1045        let serialized_value = match key {
1046            StateStoreDataKey::SyncToken => self.serialize_value(
1047                &value.into_sync_token().expect("Session data not a sync token"),
1048            )?,
1049            StateStoreDataKey::ServerCapabilities => self.serialize_value(
1050                &value
1051                    .into_server_capabilities()
1052                    .expect("Session data not containing server capabilities"),
1053            )?,
1054            StateStoreDataKey::Filter(_) => {
1055                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1056            }
1057            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1058                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1059            )?,
1060            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1061                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1062            )?,
1063            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1064                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1065            )?,
1066            StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1067                &value.into_composer_draft().expect("Session data not a composer draft"),
1068            )?,
1069            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1070                &value
1071                    .into_seen_knock_requests()
1072                    .expect("Session data is not a set of seen knock request ids"),
1073            )?,
1074        };
1075
1076        self.acquire()
1077            .await?
1078            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1079            .await
1080    }
1081
1082    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1083        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1084    }
1085
1086    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1087        let changes = changes.to_owned();
1088        let this = self.clone();
1089        self.acquire()
1090            .await?
1091            .with_transaction(move |txn| {
1092                let StateChanges {
1093                    sync_token,
1094                    account_data,
1095                    presence,
1096                    profiles,
1097                    profiles_to_delete,
1098                    state,
1099                    room_account_data,
1100                    room_infos,
1101                    receipts,
1102                    redactions,
1103                    stripped_state,
1104                    ambiguity_maps,
1105                } = changes;
1106
1107                if let Some(sync_token) = sync_token {
1108                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1109                    let value = this.serialize_value(&sync_token)?;
1110                    txn.set_kv_blob(&key, &value)?;
1111                }
1112
1113                for (event_type, event) in account_data {
1114                    let event_type =
1115                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1116                    let data = this.serialize_json(&event)?;
1117                    txn.set_global_account_data(&event_type, &data)?;
1118                }
1119
1120                for (room_id, events) in room_account_data {
1121                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1122                    for (event_type, event) in events {
1123                        let event_type =
1124                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1125                        let data = this.serialize_json(&event)?;
1126                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1127                    }
1128                }
1129
1130                for (user_id, event) in presence {
1131                    let key = this.encode_presence_key(&user_id);
1132                    let value = this.serialize_json(&event)?;
1133                    txn.set_kv_blob(&key, &value)?;
1134                }
1135
1136                for (room_id, room_info) in room_infos {
1137                    let stripped = room_info.state() == RoomState::Invited;
1138                    // Remove non-stripped data for stripped rooms and vice-versa.
1139                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1140
1141                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1142                    let state = this
1143                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1144                    let data = this.serialize_json(&room_info)?;
1145                    txn.set_room_info(&room_id, &state, &data)?;
1146                }
1147
1148                for (room_id, user_ids) in profiles_to_delete {
1149                    let room_id = this.encode_key(keys::PROFILE, room_id);
1150                    for user_id in user_ids {
1151                        let user_id = this.encode_key(keys::PROFILE, user_id);
1152                        txn.remove_room_profile(&room_id, &user_id)?;
1153                    }
1154                }
1155
1156                for (room_id, state_event_types) in state {
1157                    let profiles = profiles.get(&room_id);
1158                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1159
1160                    for (event_type, state_events) in state_event_types {
1161                        let encoded_event_type =
1162                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1163
1164                        for (state_key, raw_state_event) in state_events {
1165                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1166                            let data = this.serialize_json(&raw_state_event)?;
1167
1168                            let event_id: Option<String> =
1169                                raw_state_event.get_field("event_id").ok().flatten();
1170                            let encoded_event_id =
1171                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1172
1173                            txn.set_state_event(
1174                                &encoded_room_id,
1175                                &encoded_event_type,
1176                                &encoded_state_key,
1177                                false,
1178                                encoded_event_id.as_deref(),
1179                                &data,
1180                            )?;
1181
1182                            if event_type == StateEventType::RoomMember {
1183                                let member_event = match raw_state_event
1184                                    .deserialize_as::<SyncRoomMemberEvent>()
1185                                {
1186                                    Ok(ev) => ev,
1187                                    Err(e) => {
1188                                        debug!(event_id, "Failed to deserialize member event: {e}");
1189                                        continue;
1190                                    }
1191                                };
1192
1193                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1194                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1195                                let membership = this
1196                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1197                                let data = this.serialize_value(&state_key)?;
1198
1199                                txn.set_member(
1200                                    &encoded_room_id,
1201                                    &user_id,
1202                                    &membership,
1203                                    false,
1204                                    &data,
1205                                )?;
1206
1207                                if let Some(profile) =
1208                                    profiles.and_then(|p| p.get(member_event.state_key()))
1209                                {
1210                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1211                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1212                                    let data = this.serialize_json(&profile)?;
1213                                    txn.set_profile(&room_id, &user_id, &data)?;
1214                                }
1215                            }
1216                        }
1217                    }
1218                }
1219
1220                for (room_id, stripped_state_event_types) in stripped_state {
1221                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1222
1223                    for (event_type, stripped_state_events) in stripped_state_event_types {
1224                        let encoded_event_type =
1225                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1226
1227                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1228                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1229                            let data = this.serialize_json(&raw_stripped_state_event)?;
1230                            txn.set_state_event(
1231                                &encoded_room_id,
1232                                &encoded_event_type,
1233                                &encoded_state_key,
1234                                true,
1235                                None,
1236                                &data,
1237                            )?;
1238
1239                            if event_type == StateEventType::RoomMember {
1240                                let member_event = match raw_stripped_state_event
1241                                    .deserialize_as::<StrippedRoomMemberEvent>(
1242                                ) {
1243                                    Ok(ev) => ev,
1244                                    Err(e) => {
1245                                        debug!("Failed to deserialize stripped member event: {e}");
1246                                        continue;
1247                                    }
1248                                };
1249
1250                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1251                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1252                                let membership = this.encode_key(
1253                                    keys::MEMBER,
1254                                    member_event.content.membership.as_str(),
1255                                );
1256                                let data = this.serialize_value(&state_key)?;
1257
1258                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1259                            }
1260                        }
1261                    }
1262                }
1263
1264                for (room_id, receipt_event) in receipts {
1265                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1266
1267                    for (event_id, receipt_types) in receipt_event {
1268                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1269
1270                        for (receipt_type, receipt_users) in receipt_types {
1271                            let receipt_type =
1272                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1273
1274                            for (user_id, receipt) in receipt_users {
1275                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1276                                // We cannot have a NULL primary key so we rely on serialization
1277                                // instead of the string representation.
1278                                let thread = this.encode_key(
1279                                    keys::RECEIPT,
1280                                    rmp_serde::to_vec_named(&receipt.thread)?,
1281                                );
1282                                let data = this.serialize_json(&ReceiptData {
1283                                    receipt,
1284                                    event_id: event_id.clone(),
1285                                    user_id,
1286                                })?;
1287
1288                                txn.set_receipt(
1289                                    &room_id,
1290                                    &encoded_user_id,
1291                                    &receipt_type,
1292                                    &thread,
1293                                    &encoded_event_id,
1294                                    &data,
1295                                )?;
1296                            }
1297                        }
1298                    }
1299                }
1300
1301                for (room_id, redactions) in redactions {
1302                    let make_room_version = || {
1303                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1304                        txn.get_room_info(&encoded_room_id)
1305                            .ok()
1306                            .flatten()
1307                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1308                            .and_then(|info| info.room_version().cloned())
1309                            .unwrap_or_else(|| {
1310                                warn!(
1311                                    ?room_id,
1312                                    "Unable to find the room version, assume version 9"
1313                                );
1314                                RoomVersionId::V9
1315                            })
1316                    };
1317
1318                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1319                    let mut room_version = None;
1320
1321                    for (event_id, redaction) in redactions {
1322                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1323
1324                        if let Some(Ok(raw_event)) = txn
1325                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1326                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1327                        {
1328                            let event = raw_event.deserialize()?;
1329                            let redacted = redact(
1330                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1331                                room_version.get_or_insert_with(make_room_version),
1332                                Some(RedactedBecause::from_raw_event(&redaction)?),
1333                            )
1334                            .map_err(Error::Redaction)?;
1335                            let data = this.serialize_json(&redacted)?;
1336
1337                            let event_type =
1338                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1339                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1340
1341                            txn.set_state_event(
1342                                &encoded_room_id,
1343                                &event_type,
1344                                &state_key,
1345                                false,
1346                                Some(&event_id),
1347                                &data,
1348                            )?;
1349                        }
1350                    }
1351                }
1352
1353                for (room_id, display_names) in ambiguity_maps {
1354                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1355
1356                    for (name, user_ids) in display_names {
1357                        let encoded_name = this.encode_key(
1358                            keys::DISPLAY_NAME,
1359                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1360                        );
1361                        let data = this.serialize_json(&user_ids)?;
1362
1363                        if user_ids.is_empty() {
1364                            txn.remove_display_name(&room_id, &encoded_name)?;
1365
1366                            // We can't do a migration to merge the previously distinct buckets of
1367                            // user IDs since the display names themselves are hashed before they
1368                            // are persisted in the store. So the store will always retain two
1369                            // buckets: one for raw display names and one for normalised ones.
1370                            //
1371                            // We therefore do the next best thing, which is a sort of a soft
1372                            // migration: we fetch both the raw and normalised buckets, then merge
1373                            // the user IDs contained in them into a separate, temporary merged
1374                            // bucket. The SDK then operates on the merged buckets exclusively. See
1375                            // the comment in `get_users_with_display_names` for details.
1376                            //
1377                            // If the merged bucket is empty, that must mean that both the raw and
1378                            // normalised buckets were also empty, so we can remove both from the
1379                            // store.
1380                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1381                            txn.remove_display_name(&room_id, &raw_name)?;
1382                        } else {
1383                            // We only create new buckets with the normalized display name.
1384                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1385                        }
1386                    }
1387                }
1388
1389                Ok::<_, Error>(())
1390            })
1391            .await?;
1392
1393        Ok(())
1394    }
1395
1396    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1397        self.acquire()
1398            .await?
1399            .get_kv_blob(self.encode_presence_key(user_id))
1400            .await?
1401            .map(|data| self.deserialize_json(&data))
1402            .transpose()
1403    }
1404
1405    async fn get_presence_events(
1406        &self,
1407        user_ids: &[OwnedUserId],
1408    ) -> Result<Vec<Raw<PresenceEvent>>> {
1409        if user_ids.is_empty() {
1410            return Ok(Vec::new());
1411        }
1412
1413        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1414        self.acquire()
1415            .await?
1416            .get_kv_blobs(user_ids)
1417            .await?
1418            .into_iter()
1419            .map(|data| self.deserialize_json(&data))
1420            .collect()
1421    }
1422
1423    async fn get_state_event(
1424        &self,
1425        room_id: &RoomId,
1426        event_type: StateEventType,
1427        state_key: &str,
1428    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1429        Ok(self
1430            .get_state_events_for_keys(room_id, event_type, &[state_key])
1431            .await?
1432            .into_iter()
1433            .next())
1434    }
1435
1436    async fn get_state_events(
1437        &self,
1438        room_id: &RoomId,
1439        event_type: StateEventType,
1440    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1441        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1442        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1443        self.acquire()
1444            .await?
1445            .get_maybe_stripped_state_events(room_id, event_type)
1446            .await?
1447            .into_iter()
1448            .map(|(stripped, data)| {
1449                let ev = if stripped {
1450                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1451                } else {
1452                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1453                };
1454
1455                Ok(ev)
1456            })
1457            .collect()
1458    }
1459
1460    async fn get_state_events_for_keys(
1461        &self,
1462        room_id: &RoomId,
1463        event_type: StateEventType,
1464        state_keys: &[&str],
1465    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1466        if state_keys.is_empty() {
1467            return Ok(Vec::new());
1468        }
1469
1470        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1471        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1472        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1473        self.acquire()
1474            .await?
1475            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1476            .await?
1477            .into_iter()
1478            .map(|(stripped, data)| {
1479                let ev = if stripped {
1480                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1481                } else {
1482                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1483                };
1484
1485                Ok(ev)
1486            })
1487            .collect()
1488    }
1489
1490    async fn get_profile(
1491        &self,
1492        room_id: &RoomId,
1493        user_id: &UserId,
1494    ) -> Result<Option<MinimalRoomMemberEvent>> {
1495        let room_id = self.encode_key(keys::PROFILE, room_id);
1496        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1497
1498        self.acquire()
1499            .await?
1500            .get_profiles(room_id, user_ids)
1501            .await?
1502            .into_iter()
1503            .next()
1504            .map(|(_, data)| self.deserialize_json(&data))
1505            .transpose()
1506    }
1507
1508    async fn get_profiles<'a>(
1509        &self,
1510        room_id: &RoomId,
1511        user_ids: &'a [OwnedUserId],
1512    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1513        if user_ids.is_empty() {
1514            return Ok(BTreeMap::new());
1515        }
1516
1517        let room_id = self.encode_key(keys::PROFILE, room_id);
1518        let mut user_ids_map = user_ids
1519            .iter()
1520            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1521            .collect::<BTreeMap<_, _>>();
1522        let user_ids = user_ids_map.keys().cloned().collect();
1523
1524        self.acquire()
1525            .await?
1526            .get_profiles(room_id, user_ids)
1527            .await?
1528            .into_iter()
1529            .map(|(user_id, data)| {
1530                Ok((
1531                    user_ids_map
1532                        .remove(user_id.as_slice())
1533                        .expect("returned user IDs were requested"),
1534                    self.deserialize_json(&data)?,
1535                ))
1536            })
1537            .collect()
1538    }
1539
1540    async fn get_user_ids(
1541        &self,
1542        room_id: &RoomId,
1543        membership: RoomMemberships,
1544    ) -> Result<Vec<OwnedUserId>> {
1545        let room_id = self.encode_key(keys::MEMBER, room_id);
1546        let memberships = membership
1547            .as_vec()
1548            .into_iter()
1549            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1550            .collect();
1551        self.acquire()
1552            .await?
1553            .get_user_ids(room_id, memberships)
1554            .await?
1555            .iter()
1556            .map(|data| self.deserialize_value(data))
1557            .collect()
1558    }
1559
1560    async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
1561        self.acquire()
1562            .await?
1563            .get_room_infos()
1564            .await?
1565            .into_iter()
1566            .map(|data| self.deserialize_json(&data))
1567            .collect()
1568    }
1569
1570    async fn get_users_with_display_name(
1571        &self,
1572        room_id: &RoomId,
1573        display_name: &DisplayName,
1574    ) -> Result<BTreeSet<OwnedUserId>> {
1575        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1576        let names = vec![self.encode_key(
1577            keys::DISPLAY_NAME,
1578            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1579        )];
1580
1581        Ok(self
1582            .acquire()
1583            .await?
1584            .get_display_names(room_id, names)
1585            .await?
1586            .into_iter()
1587            .next()
1588            .map(|(_, data)| self.deserialize_json(&data))
1589            .transpose()?
1590            .unwrap_or_default())
1591    }
1592
1593    async fn get_users_with_display_names<'a>(
1594        &self,
1595        room_id: &RoomId,
1596        display_names: &'a [DisplayName],
1597    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1598        let mut result = HashMap::new();
1599
1600        if display_names.is_empty() {
1601            return Ok(result);
1602        }
1603
1604        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1605        let mut names_map = display_names
1606            .iter()
1607            .flat_map(|display_name| {
1608                // We encode the display name as the `raw_str()` and the normalized string.
1609                //
1610                // This is for compatibility reasons since:
1611                //  1. Previously "Alice" and "alice" were considered to be distinct display
1612                //     names, while we now consider them to be the same so we need to merge the
1613                //     previously distinct buckets of user IDs.
1614                //  2. We can't do a migration to merge the previously distinct buckets of user
1615                //     IDs since the display names itself are hashed before they are persisted
1616                //     in the store.
1617                let raw =
1618                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1619                let normalized = display_name.as_normalized_str().map(|normalized| {
1620                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1621                });
1622
1623                iter::once(raw).chain(normalized.into_iter())
1624            })
1625            .collect::<BTreeMap<_, _>>();
1626        let names = names_map.keys().cloned().collect();
1627
1628        for (name, data) in
1629            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1630        {
1631            let display_name =
1632                names_map.remove(name.as_slice()).expect("returned display names were requested");
1633            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1634
1635            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1636        }
1637
1638        Ok(result)
1639    }
1640
1641    async fn get_account_data_event(
1642        &self,
1643        event_type: GlobalAccountDataEventType,
1644    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1645        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1646        self.acquire()
1647            .await?
1648            .get_global_account_data(event_type)
1649            .await?
1650            .map(|value| self.deserialize_json(&value))
1651            .transpose()
1652    }
1653
1654    async fn get_room_account_data_event(
1655        &self,
1656        room_id: &RoomId,
1657        event_type: RoomAccountDataEventType,
1658    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1659        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1660        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1661        self.acquire()
1662            .await?
1663            .get_room_account_data(room_id, event_type)
1664            .await?
1665            .map(|value| self.deserialize_json(&value))
1666            .transpose()
1667    }
1668
1669    async fn get_user_room_receipt_event(
1670        &self,
1671        room_id: &RoomId,
1672        receipt_type: ReceiptType,
1673        thread: ReceiptThread,
1674        user_id: &UserId,
1675    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1676        let room_id = self.encode_key(keys::RECEIPT, room_id);
1677        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1678        // We cannot have a NULL primary key so we rely on serialization instead of the
1679        // string representation.
1680        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1681        let user_id = self.encode_key(keys::RECEIPT, user_id);
1682
1683        self.acquire()
1684            .await?
1685            .get_user_receipt(room_id, receipt_type, thread, user_id)
1686            .await?
1687            .map(|value| {
1688                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1689            })
1690            .transpose()
1691    }
1692
1693    async fn get_event_room_receipt_events(
1694        &self,
1695        room_id: &RoomId,
1696        receipt_type: ReceiptType,
1697        thread: ReceiptThread,
1698        event_id: &EventId,
1699    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1700        let room_id = self.encode_key(keys::RECEIPT, room_id);
1701        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1702        // We cannot have a NULL primary key so we rely on serialization instead of the
1703        // string representation.
1704        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1705        let event_id = self.encode_key(keys::RECEIPT, event_id);
1706
1707        self.acquire()
1708            .await?
1709            .get_event_receipts(room_id, receipt_type, thread, event_id)
1710            .await?
1711            .iter()
1712            .map(|value| {
1713                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1714            })
1715            .collect()
1716    }
1717
1718    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1719        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1720    }
1721
1722    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1723        let conn = self.acquire().await?;
1724        let key = self.encode_custom_key(key);
1725        conn.set_kv_blob(key, value).await?;
1726        Ok(())
1727    }
1728
1729    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1730        let conn = self.acquire().await?;
1731        let key = self.encode_custom_key(key);
1732        let previous = conn.get_kv_blob(key.clone()).await?;
1733        conn.set_kv_blob(key, value).await?;
1734        Ok(previous)
1735    }
1736
1737    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1738        let conn = self.acquire().await?;
1739        let key = self.encode_custom_key(key);
1740        let previous = conn.get_kv_blob(key.clone()).await?;
1741        if previous.is_some() {
1742            conn.delete_kv_blob(key).await?;
1743        }
1744        Ok(previous)
1745    }
1746
1747    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1748        let this = self.clone();
1749        let room_id = room_id.to_owned();
1750
1751        let conn = self.acquire().await?;
1752
1753        conn.with_transaction(move |txn| -> Result<()> {
1754            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1755            txn.remove_room_info(&room_info_room_id)?;
1756
1757            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1758            txn.remove_room_state_events(&state_event_room_id, None)?;
1759
1760            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1761            txn.remove_room_members(&member_room_id, None)?;
1762
1763            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1764            txn.remove_room_profiles(&profile_room_id)?;
1765
1766            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1767            txn.remove_room_account_data(&room_account_data_room_id)?;
1768
1769            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1770            txn.remove_room_receipts(&receipt_room_id)?;
1771
1772            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1773            txn.remove_room_display_names(&display_name_room_id)?;
1774
1775            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1776            txn.remove_room_send_queue(&send_queue_room_id)?;
1777
1778            let dependent_send_queue_room_id =
1779                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1780            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1781
1782            Ok(())
1783        })
1784        .await?;
1785
1786        conn.vacuum().await
1787    }
1788
1789    async fn save_send_queue_request(
1790        &self,
1791        room_id: &RoomId,
1792        transaction_id: OwnedTransactionId,
1793        created_at: MilliSecondsSinceUnixEpoch,
1794        content: QueuedRequestKind,
1795        priority: usize,
1796    ) -> Result<(), Self::Error> {
1797        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1798        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1799
1800        let content = self.serialize_json(&content)?;
1801        // The transaction id is used both as a key (in remove/update) and a value (as
1802        // it's useful for the callers), so we keep it as is, and neither hash
1803        // it (with encode_key) or encrypt it (through serialize_value). After
1804        // all, it carries no personal information, so this is considered fine.
1805
1806        let created_at_ts: u64 = created_at.0.into();
1807        self.acquire()
1808            .await?
1809            .with_transaction(move |txn| {
1810                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1811                Ok(())
1812            })
1813            .await
1814    }
1815
1816    async fn update_send_queue_request(
1817        &self,
1818        room_id: &RoomId,
1819        transaction_id: &TransactionId,
1820        content: QueuedRequestKind,
1821    ) -> Result<bool, Self::Error> {
1822        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1823
1824        let content = self.serialize_json(&content)?;
1825        // See comment in [`Self::save_send_queue_event`] to understand why the
1826        // transaction id is neither encrypted or hashed.
1827        let transaction_id = transaction_id.to_string();
1828
1829        let num_updated = self.acquire()
1830            .await?
1831            .with_transaction(move |txn| {
1832                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1833            })
1834            .await?;
1835
1836        Ok(num_updated > 0)
1837    }
1838
1839    async fn remove_send_queue_request(
1840        &self,
1841        room_id: &RoomId,
1842        transaction_id: &TransactionId,
1843    ) -> Result<bool, Self::Error> {
1844        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1845
1846        // See comment in `save_send_queue_event`.
1847        let transaction_id = transaction_id.to_string();
1848
1849        let num_deleted = self
1850            .acquire()
1851            .await?
1852            .with_transaction(move |txn| {
1853                txn.prepare_cached(
1854                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1855                )?
1856                .execute((room_id, &transaction_id))
1857            })
1858            .await?;
1859
1860        Ok(num_deleted > 0)
1861    }
1862
1863    async fn load_send_queue_requests(
1864        &self,
1865        room_id: &RoomId,
1866    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1867        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1868
1869        // Note: ROWID is always present and is an auto-incremented integer counter. We
1870        // want to maintain the insertion order, so we can sort using it.
1871        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1872        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1873            .acquire()
1874            .await?
1875            .prepare(
1876                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1877                |mut stmt| {
1878                    stmt.query((room_id,))?
1879                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1880                        .collect()
1881                },
1882            )
1883            .await?;
1884
1885        let mut requests = Vec::with_capacity(res.len());
1886        for entry in res {
1887            let created_at = entry
1888                .4
1889                .and_then(UInt::new)
1890                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1891            requests.push(QueuedRequest {
1892                transaction_id: entry.0.into(),
1893                kind: self.deserialize_json(&entry.1)?,
1894                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1895                priority: entry.3,
1896                created_at,
1897            });
1898        }
1899
1900        Ok(requests)
1901    }
1902
1903    async fn update_send_queue_request_status(
1904        &self,
1905        room_id: &RoomId,
1906        transaction_id: &TransactionId,
1907        error: Option<QueueWedgeError>,
1908    ) -> Result<(), Self::Error> {
1909        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1910
1911        // See comment in `save_send_queue_event`.
1912        let transaction_id = transaction_id.to_string();
1913
1914        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1915        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1916
1917        self.acquire()
1918            .await?
1919            .with_transaction(move |txn| {
1920                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1921                Ok(())
1922            })
1923            .await
1924    }
1925
1926    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1927        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1928        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1929        // != encrypted(X), since we use a nonce in the encryption process.
1930
1931        let res: Vec<Vec<u8>> = self
1932            .acquire()
1933            .await?
1934            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1935                stmt.query(())?.mapped(|row| row.get(0)).collect()
1936            })
1937            .await?;
1938
1939        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1940        // then rejigger that into a vector.
1941        Ok(res
1942            .into_iter()
1943            .map(|entry| self.deserialize_value(&entry))
1944            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1945            .into_iter()
1946            .collect())
1947    }
1948
1949    async fn save_dependent_queued_request(
1950        &self,
1951        room_id: &RoomId,
1952        parent_txn_id: &TransactionId,
1953        own_txn_id: ChildTransactionId,
1954        created_at: MilliSecondsSinceUnixEpoch,
1955        content: DependentQueuedRequestKind,
1956    ) -> Result<()> {
1957        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1958        let content = self.serialize_json(&content)?;
1959
1960        // See comment in `save_send_queue_event`.
1961        let parent_txn_id = parent_txn_id.to_string();
1962        let own_txn_id = own_txn_id.to_string();
1963
1964        let created_at_ts: u64 = created_at.0.into();
1965        self.acquire()
1966            .await?
1967            .with_transaction(move |txn| {
1968                txn.prepare_cached(
1969                    r#"INSERT INTO dependent_send_queue_events
1970                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1971                       VALUES (?, ?, ?, ?, ?)"#,
1972                )?
1973                .execute((
1974                    room_id,
1975                    parent_txn_id,
1976                    own_txn_id,
1977                    content,
1978                    created_at_ts,
1979                ))?;
1980                Ok(())
1981            })
1982            .await
1983    }
1984
1985    async fn update_dependent_queued_request(
1986        &self,
1987        room_id: &RoomId,
1988        own_transaction_id: &ChildTransactionId,
1989        new_content: DependentQueuedRequestKind,
1990    ) -> Result<bool> {
1991        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1992        let content = self.serialize_json(&new_content)?;
1993
1994        // See comment in `save_send_queue_event`.
1995        let own_txn_id = own_transaction_id.to_string();
1996
1997        let num_updated = self
1998            .acquire()
1999            .await?
2000            .with_transaction(move |txn| {
2001                txn.prepare_cached(
2002                    r#"UPDATE dependent_send_queue_events
2003                       SET content = ?
2004                       WHERE own_transaction_id = ?
2005                       AND room_id = ?"#,
2006                )?
2007                .execute((content, own_txn_id, room_id))
2008            })
2009            .await?;
2010
2011        if num_updated > 1 {
2012            return Err(Error::InconsistentUpdate);
2013        }
2014
2015        Ok(num_updated == 1)
2016    }
2017
2018    async fn mark_dependent_queued_requests_as_ready(
2019        &self,
2020        room_id: &RoomId,
2021        parent_txn_id: &TransactionId,
2022        parent_key: SentRequestKey,
2023    ) -> Result<usize> {
2024        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2025        let parent_key = self.serialize_value(&parent_key)?;
2026
2027        // See comment in `save_send_queue_event`.
2028        let parent_txn_id = parent_txn_id.to_string();
2029
2030        self.acquire()
2031            .await?
2032            .with_transaction(move |txn| {
2033                Ok(txn.prepare_cached(
2034                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2035                )?
2036                .execute((parent_key, parent_txn_id, room_id))?)
2037            })
2038            .await
2039    }
2040
2041    async fn remove_dependent_queued_request(
2042        &self,
2043        room_id: &RoomId,
2044        txn_id: &ChildTransactionId,
2045    ) -> Result<bool> {
2046        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2047
2048        // See comment in `save_send_queue_event`.
2049        let txn_id = txn_id.to_string();
2050
2051        let num_deleted = self
2052            .acquire()
2053            .await?
2054            .with_transaction(move |txn| {
2055                txn.prepare_cached(
2056                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2057                )?
2058                .execute((txn_id, room_id))
2059            })
2060            .await?;
2061
2062        Ok(num_deleted > 0)
2063    }
2064
2065    async fn load_dependent_queued_requests(
2066        &self,
2067        room_id: &RoomId,
2068    ) -> Result<Vec<DependentQueuedRequest>> {
2069        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2070
2071        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2072        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2073            .acquire()
2074            .await?
2075            .prepare(
2076                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2077                |mut stmt| {
2078                    stmt.query((room_id,))?
2079                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2080                        .collect()
2081                },
2082            )
2083            .await?;
2084
2085        let mut dependent_events = Vec::with_capacity(res.len());
2086        for entry in res {
2087            let created_at = entry
2088                .4
2089                .and_then(UInt::new)
2090                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2091            dependent_events.push(DependentQueuedRequest {
2092                own_transaction_id: entry.0.into(),
2093                parent_transaction_id: entry.1.into(),
2094                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2095                kind: self.deserialize_json(&entry.3)?,
2096                created_at,
2097            });
2098        }
2099
2100        Ok(dependent_events)
2101    }
2102}
2103
2104#[derive(Debug, Clone, Serialize, Deserialize)]
2105struct ReceiptData {
2106    receipt: Receipt,
2107    event_id: OwnedEventId,
2108    user_id: OwnedUserId,
2109}
2110
2111#[cfg(test)]
2112mod tests {
2113    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2114
2115    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2116    use once_cell::sync::Lazy;
2117    use tempfile::{tempdir, TempDir};
2118
2119    use super::SqliteStateStore;
2120
2121    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2122    static NUM: AtomicU32 = AtomicU32::new(0);
2123
2124    async fn get_store() -> Result<impl StateStore, StoreError> {
2125        let name = NUM.fetch_add(1, SeqCst).to_string();
2126        let tmpdir_path = TMP_DIR.path().join(name);
2127
2128        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2129
2130        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2131    }
2132
2133    statestore_integration_tests!();
2134}
2135
2136#[cfg(test)]
2137mod encrypted_tests {
2138    use std::{
2139        path::PathBuf,
2140        sync::atomic::{AtomicU32, Ordering::SeqCst},
2141    };
2142
2143    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2144    use matrix_sdk_test::async_test;
2145    use once_cell::sync::Lazy;
2146    use tempfile::{tempdir, TempDir};
2147
2148    use super::SqliteStateStore;
2149    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2150
2151    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2152    static NUM: AtomicU32 = AtomicU32::new(0);
2153
2154    fn new_state_store_workspace() -> PathBuf {
2155        let name = NUM.fetch_add(1, SeqCst).to_string();
2156        TMP_DIR.path().join(name)
2157    }
2158
2159    async fn get_store() -> Result<impl StateStore, StoreError> {
2160        let tmpdir_path = new_state_store_workspace();
2161
2162        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2163
2164        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2165            .await
2166            .unwrap())
2167    }
2168
2169    #[async_test]
2170    async fn test_pool_size() {
2171        let tmpdir_path = new_state_store_workspace();
2172        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2173
2174        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2175
2176        assert_eq!(store.pool.status().max_size, 42);
2177    }
2178
2179    #[async_test]
2180    async fn test_cache_size() {
2181        let tmpdir_path = new_state_store_workspace();
2182        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2183
2184        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2185
2186        let conn = store.pool.get().await.unwrap();
2187        let cache_size =
2188            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2189
2190        // The value passed to  `SqliteStoreConfig` is in bytes. Check it is converted
2191        // to kibibytes. Also, it must be a negative value because it _is_ the size in
2192        // kibibytes, not in page size.
2193        assert_eq!(cache_size, -(1500 / 1024));
2194    }
2195
2196    #[async_test]
2197    async fn test_journal_size_limit() {
2198        let tmpdir_path = new_state_store_workspace();
2199        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2200
2201        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2202
2203        let conn = store.pool.get().await.unwrap();
2204        let journal_size_limit = conn
2205            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2206            .await
2207            .unwrap();
2208
2209        // The value passed to  `SqliteStoreConfig` is in bytes. It stays in bytes in
2210        // SQLite.
2211        assert_eq!(journal_size_limit, 1500);
2212    }
2213
2214    statestore_integration_tests!();
2215}
2216
2217#[cfg(test)]
2218mod migration_tests {
2219    use std::{
2220        path::{Path, PathBuf},
2221        sync::{
2222            atomic::{AtomicU32, Ordering::SeqCst},
2223            Arc,
2224        },
2225    };
2226
2227    use deadpool_sqlite::Runtime;
2228    use matrix_sdk_base::{
2229        store::{ChildTransactionId, DependentQueuedRequestKind, SerializableEventContent},
2230        sync::UnreadNotificationsCount,
2231        RoomState, StateStore,
2232    };
2233    use matrix_sdk_test::async_test;
2234    use once_cell::sync::Lazy;
2235    use ruma::{
2236        events::{
2237            room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2238            StateEventType,
2239        },
2240        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2241        UserId,
2242    };
2243    use rusqlite::Transaction;
2244    use serde_json::json;
2245    use tempfile::{tempdir, TempDir};
2246    use tokio::fs;
2247
2248    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2249    use crate::{
2250        error::{Error, Result},
2251        utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2252        OpenStoreError,
2253    };
2254
2255    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2256    static NUM: AtomicU32 = AtomicU32::new(0);
2257    const SECRET: &str = "secret";
2258
2259    fn new_path() -> PathBuf {
2260        let name = NUM.fetch_add(1, SeqCst).to_string();
2261        TMP_DIR.path().join(name)
2262    }
2263
2264    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2265        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2266
2267        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2268        // use default pool config
2269
2270        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2271        let conn = pool.get().await?;
2272
2273        init(&conn).await?;
2274
2275        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2276        let this = SqliteStateStore { store_cipher, pool };
2277        this.run_migrations(&conn, 1, Some(version)).await?;
2278
2279        Ok(this)
2280    }
2281
2282    fn room_info_v1_json(
2283        room_id: &RoomId,
2284        state: RoomState,
2285        name: Option<&str>,
2286        creator: Option<&UserId>,
2287    ) -> serde_json::Value {
2288        // Test with name set or not.
2289        let name_content = match name {
2290            Some(name) => json!({ "name": name }),
2291            None => json!({ "name": null }),
2292        };
2293        // Test with creator set or not.
2294        let create_content = match creator {
2295            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2296            None => RoomCreateEventContent::new_v11(),
2297        };
2298
2299        json!({
2300            "room_id": room_id,
2301            "room_type": state,
2302            "notification_counts": UnreadNotificationsCount::default(),
2303            "summary": {
2304                "heroes": [],
2305                "joined_member_count": 0,
2306                "invited_member_count": 0,
2307            },
2308            "members_synced": false,
2309            "base_info": {
2310                "dm_targets": [],
2311                "max_power_level": 100,
2312                "name": {
2313                    "Original": {
2314                        "content": name_content,
2315                    },
2316                },
2317                "create": {
2318                    "Original": {
2319                        "content": create_content,
2320                    }
2321                }
2322            },
2323        })
2324    }
2325
2326    #[async_test]
2327    pub async fn test_migrating_v1_to_v2() {
2328        let path = new_path();
2329        // Create and populate db.
2330        {
2331            let db = create_fake_db(&path, 1).await.unwrap();
2332            let conn = db.pool.get().await.unwrap();
2333
2334            let this = db.clone();
2335            conn.with_transaction(move |txn| {
2336                for i in 0..5 {
2337                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2338                    let (state, stripped) =
2339                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2340                    let info = room_info_v1_json(&room_id, state, None, None);
2341
2342                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2343                    let data = this.serialize_json(&info)?;
2344
2345                    txn.prepare_cached(
2346                        "INSERT INTO room_info (room_id, stripped, data)
2347                         VALUES (?, ?, ?)",
2348                    )?
2349                    .execute((room_id, stripped, data))?;
2350                }
2351
2352                Result::<_, Error>::Ok(())
2353            })
2354            .await
2355            .unwrap();
2356        }
2357
2358        // This transparently migrates to the latest version.
2359        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2360
2361        // Check all room infos are there.
2362        assert_eq!(store.get_room_infos().await.unwrap().len(), 5);
2363    }
2364
2365    // Add a room in version 2 format of the state store.
2366    fn add_room_v2(
2367        this: &SqliteStateStore,
2368        txn: &Transaction<'_>,
2369        room_id: &RoomId,
2370        name: Option<&str>,
2371        create_creator: Option<&UserId>,
2372        create_sender: Option<&UserId>,
2373    ) -> Result<(), Error> {
2374        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2375
2376        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2377        let encoded_state =
2378            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2379        let data = this.serialize_json(&room_info_json)?;
2380
2381        txn.prepare_cached(
2382            "INSERT INTO room_info (room_id, state, data)
2383             VALUES (?, ?, ?)",
2384        )?
2385        .execute((encoded_room_id, encoded_state, data))?;
2386
2387        // Test with or without `m.room.create` event in the room state.
2388        let Some(create_sender) = create_sender else {
2389            return Ok(());
2390        };
2391
2392        let create_content = match create_creator {
2393            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2394            None => RoomCreateEventContent::new_v11(),
2395        };
2396
2397        let event_id = EventId::new(server_name!("dummy.local"));
2398        let create_event = json!({
2399            "content": create_content,
2400            "event_id": event_id,
2401            "sender": create_sender.to_owned(),
2402            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2403            "state_key": "",
2404            "type": "m.room.create",
2405            "unsigned": {},
2406        });
2407
2408        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2409        let encoded_event_type =
2410            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2411        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2412        let stripped = false;
2413        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2414        let data = this.serialize_json(&create_event)?;
2415
2416        txn.prepare_cached(
2417            "INSERT
2418             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2419             VALUES (?, ?, ?, ?, ?, ?)",
2420        )?
2421        .execute((
2422            encoded_room_id,
2423            encoded_event_type,
2424            encoded_state_key,
2425            stripped,
2426            encoded_event_id,
2427            data,
2428        ))?;
2429
2430        Ok(())
2431    }
2432
2433    #[async_test]
2434    pub async fn test_migrating_v2_to_v3() {
2435        let path = new_path();
2436
2437        // Room A: with name, creator and sender.
2438        let room_a_id = room_id!("!room_a:dummy.local");
2439        let room_a_name = "Room A";
2440        let room_a_creator = user_id!("@creator:dummy.local");
2441        // Use a different sender to check that sender is used over creator in
2442        // migration.
2443        let room_a_create_sender = user_id!("@sender:dummy.local");
2444
2445        // Room B: without name, creator and sender.
2446        let room_b_id = room_id!("!room_b:dummy.local");
2447
2448        // Room C: only with sender.
2449        let room_c_id = room_id!("!room_c:dummy.local");
2450        let room_c_create_sender = user_id!("@creator:dummy.local");
2451
2452        // Create and populate db.
2453        {
2454            let db = create_fake_db(&path, 2).await.unwrap();
2455            let conn = db.pool.get().await.unwrap();
2456
2457            let this = db.clone();
2458            conn.with_transaction(move |txn| {
2459                add_room_v2(
2460                    &this,
2461                    txn,
2462                    room_a_id,
2463                    Some(room_a_name),
2464                    Some(room_a_creator),
2465                    Some(room_a_create_sender),
2466                )?;
2467                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2468                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2469
2470                Result::<_, Error>::Ok(())
2471            })
2472            .await
2473            .unwrap();
2474        }
2475
2476        // This transparently migrates to the latest version.
2477        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2478
2479        // Check all room infos are there.
2480        let room_infos = store.get_room_infos().await.unwrap();
2481        assert_eq!(room_infos.len(), 3);
2482
2483        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2484        assert_eq!(room_a.name(), Some(room_a_name));
2485        assert_eq!(room_a.creator(), Some(room_a_create_sender));
2486
2487        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2488        assert_eq!(room_b.name(), None);
2489        assert_eq!(room_b.creator(), None);
2490
2491        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2492        assert_eq!(room_c.name(), None);
2493        assert_eq!(room_c.creator(), Some(room_c_create_sender));
2494    }
2495
2496    #[async_test]
2497    pub async fn test_migrating_v7_to_v9() {
2498        let path = new_path();
2499
2500        let room_id = room_id!("!room_a:dummy.local");
2501        let wedged_event_transaction_id = TransactionId::new();
2502        let local_event_transaction_id = TransactionId::new();
2503
2504        // Create and populate db.
2505        {
2506            let db = create_fake_db(&path, 7).await.unwrap();
2507            let conn = db.pool.get().await.unwrap();
2508
2509            let wedge_tx = wedged_event_transaction_id.clone();
2510            let local_tx = local_event_transaction_id.clone();
2511
2512            conn.with_transaction(move |txn| {
2513                add_dependent_send_queue_event_v7(
2514                    &db,
2515                    txn,
2516                    room_id,
2517                    &local_tx,
2518                    ChildTransactionId::new(),
2519                    DependentQueuedRequestKind::RedactEvent,
2520                )?;
2521                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2522                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2523                Result::<_, Error>::Ok(())
2524            })
2525            .await
2526            .unwrap();
2527        }
2528
2529        // This transparently migrates to the latest version, which clears up all
2530        // requests and dependent requests.
2531        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2532
2533        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2534        assert!(requests.is_empty());
2535
2536        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2537        assert!(dependent_requests.is_empty());
2538    }
2539
2540    fn add_send_queue_event_v7(
2541        this: &SqliteStateStore,
2542        txn: &Transaction<'_>,
2543        transaction_id: &TransactionId,
2544        room_id: &RoomId,
2545        is_wedged: bool,
2546    ) -> Result<(), Error> {
2547        let content =
2548            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2549
2550        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2551        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2552
2553        let content = this.serialize_json(&content)?;
2554
2555        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2556            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2557
2558        Ok(())
2559    }
2560
2561    fn add_dependent_send_queue_event_v7(
2562        this: &SqliteStateStore,
2563        txn: &Transaction<'_>,
2564        room_id: &RoomId,
2565        parent_txn_id: &TransactionId,
2566        own_txn_id: ChildTransactionId,
2567        content: DependentQueuedRequestKind,
2568    ) -> Result<(), Error> {
2569        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2570
2571        let parent_txn_id = parent_txn_id.to_string();
2572        let own_txn_id = own_txn_id.to_string();
2573        let content = this.serialize_json(&content)?;
2574
2575        txn.prepare_cached(
2576            "INSERT INTO dependent_send_queue_events
2577                         (room_id, parent_transaction_id, own_transaction_id, content)
2578                       VALUES (?, ?, ?, ?)",
2579        )?
2580        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2581
2582        Ok(())
2583    }
2584}