1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 SentRequestKey,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
68
69const DATABASE_VERSION: u8 = 12;
75
76#[derive(Clone)]
78pub struct SqliteStateStore {
79 store_cipher: Option<Arc<StoreCipher>>,
80 pool: SqlitePool,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteStateStore {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
87 }
88}
89
90impl SqliteStateStore {
91 pub async fn open(
94 path: impl AsRef<Path>,
95 passphrase: Option<&str>,
96 ) -> Result<Self, OpenStoreError> {
97 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
98 }
99
100 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
102 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
103
104 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
105
106 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
107 config.pool = Some(pool_config);
108
109 let pool = config.create_pool(Runtime::Tokio1)?;
110
111 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
112 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
113
114 Ok(this)
115 }
116
117 async fn open_with_pool(
120 pool: SqlitePool,
121 passphrase: Option<&str>,
122 ) -> Result<Self, OpenStoreError> {
123 let conn = pool.get().await?;
124
125 let mut version = conn.db_version().await?;
126
127 if version == 0 {
128 init(&conn).await?;
129 version = 1;
130 }
131
132 let store_cipher = match passphrase {
133 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
134 None => None,
135 };
136 let this = Self { store_cipher, pool };
137 this.run_migrations(&conn, version, None).await?;
138
139 Ok(this)
140 }
141
142 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
147 let to = to.unwrap_or(DATABASE_VERSION);
148
149 if from < to {
150 debug!(version = from, new_version = to, "Upgrading database");
151 } else {
152 return Ok(());
153 }
154
155 if from < 2 && to >= 2 {
156 let this = self.clone();
157 conn.with_transaction(move |txn| {
158 txn.execute_batch(include_str!(
160 "../migrations/state_store/002_a_create_new_room_info.sql"
161 ))?;
162
163 for data in txn
165 .prepare("SELECT data FROM room_info")?
166 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
167 {
168 let data = data?;
169 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
170
171 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
172 let state = this
173 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
174 txn.prepare_cached(
175 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
176 VALUES (?, ?, ?)",
177 )?
178 .execute((room_id, state, data))?;
179 }
180
181 txn.execute_batch(include_str!(
183 "../migrations/state_store/002_b_replace_room_info.sql"
184 ))?;
185
186 txn.set_db_version(2)?;
187 Result::<_, Error>::Ok(())
188 })
189 .await?;
190 }
191
192 if from < 3 && to >= 3 {
194 let this = self.clone();
195 conn.with_transaction(move |txn| {
196 for data in txn
198 .prepare("SELECT data FROM room_info")?
199 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
200 {
201 let data = data?;
202 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
203
204 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
206 let event_type =
207 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
208 let create_res = txn
209 .prepare(
210 "SELECT stripped, data FROM state_event
211 WHERE room_id = ? AND event_type = ?",
212 )?
213 .query_row([room_id, event_type], |row| {
214 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
215 })
216 .optional()?;
217
218 let create = create_res.and_then(|(stripped, data)| {
219 let create = if stripped {
220 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
221 this.deserialize_json(&data).ok()?,
222 )
223 } else {
224 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
225 };
226 Some(create)
227 });
228
229 let migrated_room_info = room_info_v1.migrate(create.as_ref());
230
231 let data = this.serialize_json(&migrated_room_info)?;
232 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
233 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
234 .execute((data, room_id))?;
235 }
236
237 txn.set_db_version(3)?;
238 Result::<_, Error>::Ok(())
239 })
240 .await?;
241 }
242
243 if from < 4 && to >= 4 {
244 conn.with_transaction(move |txn| {
245 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
247 txn.set_db_version(4)
248 })
249 .await?;
250 }
251
252 if from < 5 && to >= 5 {
253 conn.with_transaction(move |txn| {
254 txn.execute_batch(include_str!(
256 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
257 ))?;
258 txn.set_db_version(4)
259 })
260 .await?;
261 }
262
263 if from < 6 && to >= 6 {
264 conn.with_transaction(move |txn| {
265 txn.execute_batch(include_str!(
267 "../migrations/state_store/005_send_queue_dependent_events.sql"
268 ))?;
269 txn.set_db_version(6)
270 })
271 .await?;
272 }
273
274 if from < 7 && to >= 7 {
275 conn.with_transaction(move |txn| {
276 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
278 txn.set_db_version(7)
279 })
280 .await?;
281 }
282
283 if from < 8 && to >= 8 {
284 let error = QueueWedgeError::GenericApiError {
286 msg: "local echo failed to send in a previous session".into(),
287 };
288 let default_err = self.serialize_value(&error)?;
289
290 conn.with_transaction(move |txn| {
291 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
293
294 for wedged_entries in txn
297 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
298 .query_map((), |row| {
299 Ok(
300 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
301 )
302 })? {
303
304 let (room_id, transaction_id) = wedged_entries?;
305
306 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
307 .execute((default_err.clone(), room_id, transaction_id))?;
308 }
309
310
311 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
313
314 txn.set_db_version(8)
315 })
316 .await?;
317 }
318
319 if from < 9 && to >= 9 {
320 conn.with_transaction(move |txn| {
321 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
323 txn.set_db_version(9)
324 })
325 .await?;
326 }
327
328 if from < 10 && to >= 10 {
329 conn.with_transaction(move |txn| {
330 txn.execute_batch(include_str!(
332 "../migrations/state_store/009_send_queue_priority.sql"
333 ))?;
334 txn.set_db_version(10)
335 })
336 .await?;
337 }
338
339 if from < 11 && to >= 11 {
340 conn.with_transaction(move |txn| {
341 txn.execute_batch(include_str!(
343 "../migrations/state_store/010_send_queue_enqueue_time.sql"
344 ))?;
345 txn.set_db_version(11)
346 })
347 .await?;
348 }
349
350 if from < 12 && to >= 12 {
351 conn.vacuum().await?;
355 conn.set_kv("version", vec![12]).await?;
356 }
357
358 Ok(())
359 }
360
361 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
362 if let Some(key) = &self.store_cipher {
363 let encrypted = key.encrypt_value_data(value)?;
364 Ok(rmp_serde::to_vec_named(&encrypted)?)
365 } else {
366 Ok(value)
367 }
368 }
369
370 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
371 let serialized = rmp_serde::to_vec_named(value)?;
372 self.encode_value(serialized)
373 }
374
375 fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
376 let serialized = serde_json::to_vec(value)?;
377 self.encode_value(serialized)
378 }
379
380 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
381 if let Some(key) = &self.store_cipher {
382 let encrypted = rmp_serde::from_slice(value)?;
383 let decrypted = key.decrypt_value_data(encrypted)?;
384 Ok(Cow::Owned(decrypted))
385 } else {
386 Ok(Cow::Borrowed(value))
387 }
388 }
389
390 fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
391 let decoded = self.decode_value(data)?;
392 Ok(serde_json::from_slice(&decoded)?)
393 }
394
395 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
396 let decoded = self.decode_value(value)?;
397 Ok(rmp_serde::from_slice(&decoded)?)
398 }
399
400 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
401 let bytes = key.as_ref();
402 if let Some(store_cipher) = &self.store_cipher {
403 Key::Hashed(store_cipher.hash_key(table_name, bytes))
404 } else {
405 Key::Plain(bytes.to_owned())
406 }
407 }
408
409 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410 let key_s = match key {
411 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412 StateStoreDataKey::ServerCapabilities => {
413 Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
414 }
415 StateStoreDataKey::Filter(f) => {
416 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
417 }
418 StateStoreDataKey::UserAvatarUrl(u) => {
419 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
420 }
421 StateStoreDataKey::RecentlyVisitedRooms(b) => {
422 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
423 }
424 StateStoreDataKey::UtdHookManagerData => {
425 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
426 }
427 StateStoreDataKey::ComposerDraft(room_id) => {
428 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
429 }
430 StateStoreDataKey::SeenKnockRequests(room_id) => {
431 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
432 }
433 };
434
435 self.encode_key(keys::KV_BLOB, &*key_s)
436 }
437
438 fn encode_presence_key(&self, user_id: &UserId) -> Key {
439 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
440 }
441
442 fn encode_custom_key(&self, key: &[u8]) -> Key {
443 let mut full_key = b"custom:".to_vec();
444 full_key.extend(key);
445 self.encode_key(keys::KV_BLOB, full_key)
446 }
447
448 async fn acquire(&self) -> Result<SqliteAsyncConn> {
449 Ok(self.pool.get().await?)
450 }
451
452 fn remove_maybe_stripped_room_data(
453 &self,
454 txn: &Transaction<'_>,
455 room_id: &RoomId,
456 stripped: bool,
457 ) -> rusqlite::Result<()> {
458 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
459 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
460
461 let member_room_id = self.encode_key(keys::MEMBER, room_id);
462 txn.remove_room_members(&member_room_id, Some(stripped))
463 }
464}
465
466async fn init(conn: &SqliteAsyncConn) -> Result<()> {
468 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
471 conn.with_transaction(|txn| {
472 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
473 txn.set_db_version(1)?;
474
475 Ok(())
476 })
477 .await
478}
479
480trait SqliteConnectionStateStoreExt {
481 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
482
483 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
484
485 fn set_room_account_data(
486 &self,
487 room_id: &[u8],
488 event_type: &[u8],
489 data: &[u8],
490 ) -> rusqlite::Result<()>;
491 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
492
493 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
494 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
495 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
496
497 fn set_state_event(
498 &self,
499 room_id: &[u8],
500 event_type: &[u8],
501 state_key: &[u8],
502 stripped: bool,
503 event_id: Option<&[u8]>,
504 data: &[u8],
505 ) -> rusqlite::Result<()>;
506 fn get_state_event_by_id(
507 &self,
508 room_id: &[u8],
509 event_id: &[u8],
510 ) -> rusqlite::Result<Option<Vec<u8>>>;
511 fn remove_room_state_events(
512 &self,
513 room_id: &[u8],
514 stripped: Option<bool>,
515 ) -> rusqlite::Result<()>;
516
517 fn set_member(
518 &self,
519 room_id: &[u8],
520 user_id: &[u8],
521 membership: &[u8],
522 stripped: bool,
523 data: &[u8],
524 ) -> rusqlite::Result<()>;
525 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
526
527 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
528 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
529 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
530
531 fn set_receipt(
532 &self,
533 room_id: &[u8],
534 user_id: &[u8],
535 receipt_type: &[u8],
536 thread_id: &[u8],
537 event_id: &[u8],
538 data: &[u8],
539 ) -> rusqlite::Result<()>;
540 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
544 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547}
548
549impl SqliteConnectionStateStoreExt for rusqlite::Connection {
550 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
551 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
552 Ok(())
553 }
554
555 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
556 self.prepare_cached(
557 "INSERT OR REPLACE INTO global_account_data (event_type, data)
558 VALUES (?, ?)",
559 )?
560 .execute((event_type, data))?;
561 Ok(())
562 }
563
564 fn set_room_account_data(
565 &self,
566 room_id: &[u8],
567 event_type: &[u8],
568 data: &[u8],
569 ) -> rusqlite::Result<()> {
570 self.prepare_cached(
571 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
572 VALUES (?, ?, ?)",
573 )?
574 .execute((room_id, event_type, data))?;
575 Ok(())
576 }
577
578 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579 self.prepare(
580 "DELETE FROM room_account_data
581 WHERE room_id = ?",
582 )?
583 .execute((room_id,))?;
584 Ok(())
585 }
586
587 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
588 self.prepare_cached(
589 "INSERT OR REPLACE INTO room_info (room_id, state, data)
590 VALUES (?, ?, ?)",
591 )?
592 .execute((room_id, state, data))?;
593 Ok(())
594 }
595
596 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
597 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
598 .optional()
599 }
600
601 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
603 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
604 Ok(())
605 }
606
607 fn set_state_event(
608 &self,
609 room_id: &[u8],
610 event_type: &[u8],
611 state_key: &[u8],
612 stripped: bool,
613 event_id: Option<&[u8]>,
614 data: &[u8],
615 ) -> rusqlite::Result<()> {
616 self.prepare_cached(
617 "INSERT OR REPLACE
618 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
619 VALUES (?, ?, ?, ?, ?, ?)",
620 )?
621 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
622 Ok(())
623 }
624
625 fn get_state_event_by_id(
626 &self,
627 room_id: &[u8],
628 event_id: &[u8],
629 ) -> rusqlite::Result<Option<Vec<u8>>> {
630 self.query_row(
631 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
632 (room_id, event_id),
633 |row| row.get(0),
634 )
635 .optional()
636 }
637
638 fn remove_room_state_events(
644 &self,
645 room_id: &[u8],
646 stripped: Option<bool>,
647 ) -> rusqlite::Result<()> {
648 if let Some(stripped) = stripped {
649 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
650 .execute((room_id, stripped))?;
651 } else {
652 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
653 .execute((room_id,))?;
654 }
655 Ok(())
656 }
657
658 fn set_member(
659 &self,
660 room_id: &[u8],
661 user_id: &[u8],
662 membership: &[u8],
663 stripped: bool,
664 data: &[u8],
665 ) -> rusqlite::Result<()> {
666 self.prepare_cached(
667 "INSERT OR REPLACE
668 INTO member (room_id, user_id, membership, stripped, data)
669 VALUES (?, ?, ?, ?, ?)",
670 )?
671 .execute((room_id, user_id, membership, stripped, data))?;
672 Ok(())
673 }
674
675 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
680 if let Some(stripped) = stripped {
681 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
682 .execute((room_id, stripped))?;
683 } else {
684 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
685 }
686 Ok(())
687 }
688
689 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
690 self.prepare_cached(
691 "INSERT OR REPLACE
692 INTO profile (room_id, user_id, data)
693 VALUES (?, ?, ?)",
694 )?
695 .execute((room_id, user_id, data))?;
696 Ok(())
697 }
698
699 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
700 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
701 Ok(())
702 }
703
704 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
705 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
706 .execute((room_id, user_id))?;
707 Ok(())
708 }
709
710 fn set_receipt(
711 &self,
712 room_id: &[u8],
713 user_id: &[u8],
714 receipt_type: &[u8],
715 thread: &[u8],
716 event_id: &[u8],
717 data: &[u8],
718 ) -> rusqlite::Result<()> {
719 self.prepare_cached(
720 "INSERT OR REPLACE
721 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
722 VALUES (?, ?, ?, ?, ?, ?)",
723 )?
724 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
725 Ok(())
726 }
727
728 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
730 Ok(())
731 }
732
733 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
734 self.prepare_cached(
735 "INSERT OR REPLACE
736 INTO display_name (room_id, name, data)
737 VALUES (?, ?, ?)",
738 )?
739 .execute((room_id, name, data))?;
740 Ok(())
741 }
742
743 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
744 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
745 .execute((room_id, name))?;
746 Ok(())
747 }
748
749 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
751 Ok(())
752 }
753
754 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
756 Ok(())
757 }
758
759 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
761 .execute((room_id,))?;
762 Ok(())
763 }
764}
765
766#[async_trait]
767trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
768 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
769 Ok(self
770 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
771 .await
772 .optional()?)
773 }
774
775 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
776 let keys_length = keys.len();
777
778 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
779 let sql_params = repeat_vars(keys.len());
780 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
781
782 let params = rusqlite::params_from_iter(keys);
783
784 Ok(txn
785 .prepare(&sql)?
786 .query(params)?
787 .mapped(|row| row.get(0))
788 .collect::<Result<_, _>>()?)
789 })
790 .await
791 }
792
793 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
794
795 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
796 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
797 Ok(())
798 }
799
800 async fn get_room_infos(&self) -> Result<Vec<Vec<u8>>> {
801 Ok(self
802 .prepare("SELECT data FROM room_info", move |mut stmt| {
803 stmt.query_map((), |row| row.get(0))?.collect()
804 })
805 .await?)
806 }
807
808 async fn get_maybe_stripped_state_events_for_keys(
809 &self,
810 room_id: Key,
811 event_type: Key,
812 state_keys: Vec<Key>,
813 ) -> Result<Vec<(bool, Vec<u8>)>> {
814 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
815 let sql_params = repeat_vars(state_keys.len());
816 let sql = format!(
817 "SELECT stripped, data FROM state_event
818 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
819 );
820
821 let params = rusqlite::params_from_iter(
822 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
823 );
824
825 Ok(txn
826 .prepare(&sql)?
827 .query(params)?
828 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
829 .collect::<Result<_, _>>()?)
830 })
831 .await
832 }
833
834 async fn get_maybe_stripped_state_events(
835 &self,
836 room_id: Key,
837 event_type: Key,
838 ) -> Result<Vec<(bool, Vec<u8>)>> {
839 Ok(self
840 .prepare(
841 "SELECT stripped, data FROM state_event
842 WHERE room_id = ? AND event_type = ?",
843 |mut stmt| {
844 stmt.query((room_id, event_type))?
845 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
846 .collect()
847 },
848 )
849 .await?)
850 }
851
852 async fn get_profiles(
853 &self,
854 room_id: Key,
855 user_ids: Vec<Key>,
856 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
857 let user_ids_length = user_ids.len();
858
859 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
860 let sql_params = repeat_vars(user_ids.len());
861 let sql = format!(
862 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
863 );
864
865 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
866
867 Ok(txn
868 .prepare(&sql)?
869 .query(params)?
870 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
871 .collect::<Result<_, _>>()?)
872 })
873 .await
874 }
875
876 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
877 let res = if memberships.is_empty() {
878 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
879 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
880 })
881 .await?
882 } else {
883 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
884 let sql_params = repeat_vars(memberships.len());
885 let sql = format!(
886 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
887 );
888
889 let params =
890 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
891
892 Ok(txn
893 .prepare(&sql)?
894 .query(params)?
895 .mapped(|row| row.get(0))
896 .collect::<Result<_, _>>()?)
897 })
898 .await?
899 };
900
901 Ok(res)
902 }
903
904 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
905 Ok(self
906 .query_row(
907 "SELECT data FROM global_account_data WHERE event_type = ?",
908 (event_type,),
909 |row| row.get(0),
910 )
911 .await
912 .optional()?)
913 }
914
915 async fn get_room_account_data(
916 &self,
917 room_id: Key,
918 event_type: Key,
919 ) -> Result<Option<Vec<u8>>> {
920 Ok(self
921 .query_row(
922 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
923 (room_id, event_type),
924 |row| row.get(0),
925 )
926 .await
927 .optional()?)
928 }
929
930 async fn get_display_names(
931 &self,
932 room_id: Key,
933 names: Vec<Key>,
934 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
935 let names_length = names.len();
936
937 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
938 let sql_params = repeat_vars(names.len());
939 let sql = format!(
940 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
941 );
942
943 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
944
945 Ok(txn
946 .prepare(&sql)?
947 .query(params)?
948 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
949 .collect::<Result<_, _>>()?)
950 })
951 .await
952 }
953
954 async fn get_user_receipt(
955 &self,
956 room_id: Key,
957 receipt_type: Key,
958 thread: Key,
959 user_id: Key,
960 ) -> Result<Option<Vec<u8>>> {
961 Ok(self
962 .query_row(
963 "SELECT data FROM receipt
964 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
965 (room_id, receipt_type, thread, user_id),
966 |row| row.get(0),
967 )
968 .await
969 .optional()?)
970 }
971
972 async fn get_event_receipts(
973 &self,
974 room_id: Key,
975 receipt_type: Key,
976 thread: Key,
977 event_id: Key,
978 ) -> Result<Vec<Vec<u8>>> {
979 Ok(self
980 .prepare(
981 "SELECT data FROM receipt
982 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
983 |mut stmt| {
984 stmt.query((room_id, receipt_type, thread, event_id))?
985 .mapped(|row| row.get(0))
986 .collect()
987 },
988 )
989 .await?)
990 }
991}
992
993#[async_trait]
994impl SqliteObjectStateStoreExt for SqliteAsyncConn {
995 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
996 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
997 }
998}
999
1000#[async_trait]
1001impl StateStore for SqliteStateStore {
1002 type Error = Error;
1003
1004 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1005 self.acquire()
1006 .await?
1007 .get_kv_blob(self.encode_state_store_data_key(key))
1008 .await?
1009 .map(|data| {
1010 Ok(match key {
1011 StateStoreDataKey::SyncToken => {
1012 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1013 }
1014 StateStoreDataKey::ServerCapabilities => {
1015 StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1016 }
1017 StateStoreDataKey::Filter(_) => {
1018 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1019 }
1020 StateStoreDataKey::UserAvatarUrl(_) => {
1021 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1022 }
1023 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1024 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1025 }
1026 StateStoreDataKey::UtdHookManagerData => {
1027 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1028 }
1029 StateStoreDataKey::ComposerDraft(_) => {
1030 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1031 }
1032 StateStoreDataKey::SeenKnockRequests(_) => {
1033 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1034 }
1035 })
1036 })
1037 .transpose()
1038 }
1039
1040 async fn set_kv_data(
1041 &self,
1042 key: StateStoreDataKey<'_>,
1043 value: StateStoreDataValue,
1044 ) -> Result<()> {
1045 let serialized_value = match key {
1046 StateStoreDataKey::SyncToken => self.serialize_value(
1047 &value.into_sync_token().expect("Session data not a sync token"),
1048 )?,
1049 StateStoreDataKey::ServerCapabilities => self.serialize_value(
1050 &value
1051 .into_server_capabilities()
1052 .expect("Session data not containing server capabilities"),
1053 )?,
1054 StateStoreDataKey::Filter(_) => {
1055 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1056 }
1057 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1058 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1059 )?,
1060 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1061 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1062 )?,
1063 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1064 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1065 )?,
1066 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1067 &value.into_composer_draft().expect("Session data not a composer draft"),
1068 )?,
1069 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1070 &value
1071 .into_seen_knock_requests()
1072 .expect("Session data is not a set of seen knock request ids"),
1073 )?,
1074 };
1075
1076 self.acquire()
1077 .await?
1078 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1079 .await
1080 }
1081
1082 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1083 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1084 }
1085
1086 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1087 let changes = changes.to_owned();
1088 let this = self.clone();
1089 self.acquire()
1090 .await?
1091 .with_transaction(move |txn| {
1092 let StateChanges {
1093 sync_token,
1094 account_data,
1095 presence,
1096 profiles,
1097 profiles_to_delete,
1098 state,
1099 room_account_data,
1100 room_infos,
1101 receipts,
1102 redactions,
1103 stripped_state,
1104 ambiguity_maps,
1105 } = changes;
1106
1107 if let Some(sync_token) = sync_token {
1108 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1109 let value = this.serialize_value(&sync_token)?;
1110 txn.set_kv_blob(&key, &value)?;
1111 }
1112
1113 for (event_type, event) in account_data {
1114 let event_type =
1115 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1116 let data = this.serialize_json(&event)?;
1117 txn.set_global_account_data(&event_type, &data)?;
1118 }
1119
1120 for (room_id, events) in room_account_data {
1121 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1122 for (event_type, event) in events {
1123 let event_type =
1124 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1125 let data = this.serialize_json(&event)?;
1126 txn.set_room_account_data(&room_id, &event_type, &data)?;
1127 }
1128 }
1129
1130 for (user_id, event) in presence {
1131 let key = this.encode_presence_key(&user_id);
1132 let value = this.serialize_json(&event)?;
1133 txn.set_kv_blob(&key, &value)?;
1134 }
1135
1136 for (room_id, room_info) in room_infos {
1137 let stripped = room_info.state() == RoomState::Invited;
1138 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1140
1141 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1142 let state = this
1143 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1144 let data = this.serialize_json(&room_info)?;
1145 txn.set_room_info(&room_id, &state, &data)?;
1146 }
1147
1148 for (room_id, user_ids) in profiles_to_delete {
1149 let room_id = this.encode_key(keys::PROFILE, room_id);
1150 for user_id in user_ids {
1151 let user_id = this.encode_key(keys::PROFILE, user_id);
1152 txn.remove_room_profile(&room_id, &user_id)?;
1153 }
1154 }
1155
1156 for (room_id, state_event_types) in state {
1157 let profiles = profiles.get(&room_id);
1158 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1159
1160 for (event_type, state_events) in state_event_types {
1161 let encoded_event_type =
1162 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1163
1164 for (state_key, raw_state_event) in state_events {
1165 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1166 let data = this.serialize_json(&raw_state_event)?;
1167
1168 let event_id: Option<String> =
1169 raw_state_event.get_field("event_id").ok().flatten();
1170 let encoded_event_id =
1171 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1172
1173 txn.set_state_event(
1174 &encoded_room_id,
1175 &encoded_event_type,
1176 &encoded_state_key,
1177 false,
1178 encoded_event_id.as_deref(),
1179 &data,
1180 )?;
1181
1182 if event_type == StateEventType::RoomMember {
1183 let member_event = match raw_state_event
1184 .deserialize_as::<SyncRoomMemberEvent>()
1185 {
1186 Ok(ev) => ev,
1187 Err(e) => {
1188 debug!(event_id, "Failed to deserialize member event: {e}");
1189 continue;
1190 }
1191 };
1192
1193 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1194 let user_id = this.encode_key(keys::MEMBER, &state_key);
1195 let membership = this
1196 .encode_key(keys::MEMBER, member_event.membership().as_str());
1197 let data = this.serialize_value(&state_key)?;
1198
1199 txn.set_member(
1200 &encoded_room_id,
1201 &user_id,
1202 &membership,
1203 false,
1204 &data,
1205 )?;
1206
1207 if let Some(profile) =
1208 profiles.and_then(|p| p.get(member_event.state_key()))
1209 {
1210 let room_id = this.encode_key(keys::PROFILE, &room_id);
1211 let user_id = this.encode_key(keys::PROFILE, &state_key);
1212 let data = this.serialize_json(&profile)?;
1213 txn.set_profile(&room_id, &user_id, &data)?;
1214 }
1215 }
1216 }
1217 }
1218 }
1219
1220 for (room_id, stripped_state_event_types) in stripped_state {
1221 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1222
1223 for (event_type, stripped_state_events) in stripped_state_event_types {
1224 let encoded_event_type =
1225 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1226
1227 for (state_key, raw_stripped_state_event) in stripped_state_events {
1228 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1229 let data = this.serialize_json(&raw_stripped_state_event)?;
1230 txn.set_state_event(
1231 &encoded_room_id,
1232 &encoded_event_type,
1233 &encoded_state_key,
1234 true,
1235 None,
1236 &data,
1237 )?;
1238
1239 if event_type == StateEventType::RoomMember {
1240 let member_event = match raw_stripped_state_event
1241 .deserialize_as::<StrippedRoomMemberEvent>(
1242 ) {
1243 Ok(ev) => ev,
1244 Err(e) => {
1245 debug!("Failed to deserialize stripped member event: {e}");
1246 continue;
1247 }
1248 };
1249
1250 let room_id = this.encode_key(keys::MEMBER, &room_id);
1251 let user_id = this.encode_key(keys::MEMBER, &state_key);
1252 let membership = this.encode_key(
1253 keys::MEMBER,
1254 member_event.content.membership.as_str(),
1255 );
1256 let data = this.serialize_value(&state_key)?;
1257
1258 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1259 }
1260 }
1261 }
1262 }
1263
1264 for (room_id, receipt_event) in receipts {
1265 let room_id = this.encode_key(keys::RECEIPT, room_id);
1266
1267 for (event_id, receipt_types) in receipt_event {
1268 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1269
1270 for (receipt_type, receipt_users) in receipt_types {
1271 let receipt_type =
1272 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1273
1274 for (user_id, receipt) in receipt_users {
1275 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1276 let thread = this.encode_key(
1279 keys::RECEIPT,
1280 rmp_serde::to_vec_named(&receipt.thread)?,
1281 );
1282 let data = this.serialize_json(&ReceiptData {
1283 receipt,
1284 event_id: event_id.clone(),
1285 user_id,
1286 })?;
1287
1288 txn.set_receipt(
1289 &room_id,
1290 &encoded_user_id,
1291 &receipt_type,
1292 &thread,
1293 &encoded_event_id,
1294 &data,
1295 )?;
1296 }
1297 }
1298 }
1299 }
1300
1301 for (room_id, redactions) in redactions {
1302 let make_room_version = || {
1303 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1304 txn.get_room_info(&encoded_room_id)
1305 .ok()
1306 .flatten()
1307 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1308 .and_then(|info| info.room_version().cloned())
1309 .unwrap_or_else(|| {
1310 warn!(
1311 ?room_id,
1312 "Unable to find the room version, assume version 9"
1313 );
1314 RoomVersionId::V9
1315 })
1316 };
1317
1318 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1319 let mut room_version = None;
1320
1321 for (event_id, redaction) in redactions {
1322 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1323
1324 if let Some(Ok(raw_event)) = txn
1325 .get_state_event_by_id(&encoded_room_id, &event_id)?
1326 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1327 {
1328 let event = raw_event.deserialize()?;
1329 let redacted = redact(
1330 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1331 room_version.get_or_insert_with(make_room_version),
1332 Some(RedactedBecause::from_raw_event(&redaction)?),
1333 )
1334 .map_err(Error::Redaction)?;
1335 let data = this.serialize_json(&redacted)?;
1336
1337 let event_type =
1338 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1339 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1340
1341 txn.set_state_event(
1342 &encoded_room_id,
1343 &event_type,
1344 &state_key,
1345 false,
1346 Some(&event_id),
1347 &data,
1348 )?;
1349 }
1350 }
1351 }
1352
1353 for (room_id, display_names) in ambiguity_maps {
1354 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1355
1356 for (name, user_ids) in display_names {
1357 let encoded_name = this.encode_key(
1358 keys::DISPLAY_NAME,
1359 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1360 );
1361 let data = this.serialize_json(&user_ids)?;
1362
1363 if user_ids.is_empty() {
1364 txn.remove_display_name(&room_id, &encoded_name)?;
1365
1366 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1381 txn.remove_display_name(&room_id, &raw_name)?;
1382 } else {
1383 txn.set_display_name(&room_id, &encoded_name, &data)?;
1385 }
1386 }
1387 }
1388
1389 Ok::<_, Error>(())
1390 })
1391 .await?;
1392
1393 Ok(())
1394 }
1395
1396 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1397 self.acquire()
1398 .await?
1399 .get_kv_blob(self.encode_presence_key(user_id))
1400 .await?
1401 .map(|data| self.deserialize_json(&data))
1402 .transpose()
1403 }
1404
1405 async fn get_presence_events(
1406 &self,
1407 user_ids: &[OwnedUserId],
1408 ) -> Result<Vec<Raw<PresenceEvent>>> {
1409 if user_ids.is_empty() {
1410 return Ok(Vec::new());
1411 }
1412
1413 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1414 self.acquire()
1415 .await?
1416 .get_kv_blobs(user_ids)
1417 .await?
1418 .into_iter()
1419 .map(|data| self.deserialize_json(&data))
1420 .collect()
1421 }
1422
1423 async fn get_state_event(
1424 &self,
1425 room_id: &RoomId,
1426 event_type: StateEventType,
1427 state_key: &str,
1428 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1429 Ok(self
1430 .get_state_events_for_keys(room_id, event_type, &[state_key])
1431 .await?
1432 .into_iter()
1433 .next())
1434 }
1435
1436 async fn get_state_events(
1437 &self,
1438 room_id: &RoomId,
1439 event_type: StateEventType,
1440 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1441 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1442 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1443 self.acquire()
1444 .await?
1445 .get_maybe_stripped_state_events(room_id, event_type)
1446 .await?
1447 .into_iter()
1448 .map(|(stripped, data)| {
1449 let ev = if stripped {
1450 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1451 } else {
1452 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1453 };
1454
1455 Ok(ev)
1456 })
1457 .collect()
1458 }
1459
1460 async fn get_state_events_for_keys(
1461 &self,
1462 room_id: &RoomId,
1463 event_type: StateEventType,
1464 state_keys: &[&str],
1465 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1466 if state_keys.is_empty() {
1467 return Ok(Vec::new());
1468 }
1469
1470 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1471 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1472 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1473 self.acquire()
1474 .await?
1475 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1476 .await?
1477 .into_iter()
1478 .map(|(stripped, data)| {
1479 let ev = if stripped {
1480 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1481 } else {
1482 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1483 };
1484
1485 Ok(ev)
1486 })
1487 .collect()
1488 }
1489
1490 async fn get_profile(
1491 &self,
1492 room_id: &RoomId,
1493 user_id: &UserId,
1494 ) -> Result<Option<MinimalRoomMemberEvent>> {
1495 let room_id = self.encode_key(keys::PROFILE, room_id);
1496 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1497
1498 self.acquire()
1499 .await?
1500 .get_profiles(room_id, user_ids)
1501 .await?
1502 .into_iter()
1503 .next()
1504 .map(|(_, data)| self.deserialize_json(&data))
1505 .transpose()
1506 }
1507
1508 async fn get_profiles<'a>(
1509 &self,
1510 room_id: &RoomId,
1511 user_ids: &'a [OwnedUserId],
1512 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1513 if user_ids.is_empty() {
1514 return Ok(BTreeMap::new());
1515 }
1516
1517 let room_id = self.encode_key(keys::PROFILE, room_id);
1518 let mut user_ids_map = user_ids
1519 .iter()
1520 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1521 .collect::<BTreeMap<_, _>>();
1522 let user_ids = user_ids_map.keys().cloned().collect();
1523
1524 self.acquire()
1525 .await?
1526 .get_profiles(room_id, user_ids)
1527 .await?
1528 .into_iter()
1529 .map(|(user_id, data)| {
1530 Ok((
1531 user_ids_map
1532 .remove(user_id.as_slice())
1533 .expect("returned user IDs were requested"),
1534 self.deserialize_json(&data)?,
1535 ))
1536 })
1537 .collect()
1538 }
1539
1540 async fn get_user_ids(
1541 &self,
1542 room_id: &RoomId,
1543 membership: RoomMemberships,
1544 ) -> Result<Vec<OwnedUserId>> {
1545 let room_id = self.encode_key(keys::MEMBER, room_id);
1546 let memberships = membership
1547 .as_vec()
1548 .into_iter()
1549 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1550 .collect();
1551 self.acquire()
1552 .await?
1553 .get_user_ids(room_id, memberships)
1554 .await?
1555 .iter()
1556 .map(|data| self.deserialize_value(data))
1557 .collect()
1558 }
1559
1560 async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
1561 self.acquire()
1562 .await?
1563 .get_room_infos()
1564 .await?
1565 .into_iter()
1566 .map(|data| self.deserialize_json(&data))
1567 .collect()
1568 }
1569
1570 async fn get_users_with_display_name(
1571 &self,
1572 room_id: &RoomId,
1573 display_name: &DisplayName,
1574 ) -> Result<BTreeSet<OwnedUserId>> {
1575 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1576 let names = vec![self.encode_key(
1577 keys::DISPLAY_NAME,
1578 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1579 )];
1580
1581 Ok(self
1582 .acquire()
1583 .await?
1584 .get_display_names(room_id, names)
1585 .await?
1586 .into_iter()
1587 .next()
1588 .map(|(_, data)| self.deserialize_json(&data))
1589 .transpose()?
1590 .unwrap_or_default())
1591 }
1592
1593 async fn get_users_with_display_names<'a>(
1594 &self,
1595 room_id: &RoomId,
1596 display_names: &'a [DisplayName],
1597 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1598 let mut result = HashMap::new();
1599
1600 if display_names.is_empty() {
1601 return Ok(result);
1602 }
1603
1604 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1605 let mut names_map = display_names
1606 .iter()
1607 .flat_map(|display_name| {
1608 let raw =
1618 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1619 let normalized = display_name.as_normalized_str().map(|normalized| {
1620 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1621 });
1622
1623 iter::once(raw).chain(normalized.into_iter())
1624 })
1625 .collect::<BTreeMap<_, _>>();
1626 let names = names_map.keys().cloned().collect();
1627
1628 for (name, data) in
1629 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1630 {
1631 let display_name =
1632 names_map.remove(name.as_slice()).expect("returned display names were requested");
1633 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1634
1635 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1636 }
1637
1638 Ok(result)
1639 }
1640
1641 async fn get_account_data_event(
1642 &self,
1643 event_type: GlobalAccountDataEventType,
1644 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1645 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1646 self.acquire()
1647 .await?
1648 .get_global_account_data(event_type)
1649 .await?
1650 .map(|value| self.deserialize_json(&value))
1651 .transpose()
1652 }
1653
1654 async fn get_room_account_data_event(
1655 &self,
1656 room_id: &RoomId,
1657 event_type: RoomAccountDataEventType,
1658 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1659 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1660 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1661 self.acquire()
1662 .await?
1663 .get_room_account_data(room_id, event_type)
1664 .await?
1665 .map(|value| self.deserialize_json(&value))
1666 .transpose()
1667 }
1668
1669 async fn get_user_room_receipt_event(
1670 &self,
1671 room_id: &RoomId,
1672 receipt_type: ReceiptType,
1673 thread: ReceiptThread,
1674 user_id: &UserId,
1675 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1676 let room_id = self.encode_key(keys::RECEIPT, room_id);
1677 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1678 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1681 let user_id = self.encode_key(keys::RECEIPT, user_id);
1682
1683 self.acquire()
1684 .await?
1685 .get_user_receipt(room_id, receipt_type, thread, user_id)
1686 .await?
1687 .map(|value| {
1688 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1689 })
1690 .transpose()
1691 }
1692
1693 async fn get_event_room_receipt_events(
1694 &self,
1695 room_id: &RoomId,
1696 receipt_type: ReceiptType,
1697 thread: ReceiptThread,
1698 event_id: &EventId,
1699 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1700 let room_id = self.encode_key(keys::RECEIPT, room_id);
1701 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1702 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1705 let event_id = self.encode_key(keys::RECEIPT, event_id);
1706
1707 self.acquire()
1708 .await?
1709 .get_event_receipts(room_id, receipt_type, thread, event_id)
1710 .await?
1711 .iter()
1712 .map(|value| {
1713 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1714 })
1715 .collect()
1716 }
1717
1718 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1719 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1720 }
1721
1722 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1723 let conn = self.acquire().await?;
1724 let key = self.encode_custom_key(key);
1725 conn.set_kv_blob(key, value).await?;
1726 Ok(())
1727 }
1728
1729 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1730 let conn = self.acquire().await?;
1731 let key = self.encode_custom_key(key);
1732 let previous = conn.get_kv_blob(key.clone()).await?;
1733 conn.set_kv_blob(key, value).await?;
1734 Ok(previous)
1735 }
1736
1737 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1738 let conn = self.acquire().await?;
1739 let key = self.encode_custom_key(key);
1740 let previous = conn.get_kv_blob(key.clone()).await?;
1741 if previous.is_some() {
1742 conn.delete_kv_blob(key).await?;
1743 }
1744 Ok(previous)
1745 }
1746
1747 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1748 let this = self.clone();
1749 let room_id = room_id.to_owned();
1750
1751 let conn = self.acquire().await?;
1752
1753 conn.with_transaction(move |txn| -> Result<()> {
1754 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1755 txn.remove_room_info(&room_info_room_id)?;
1756
1757 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1758 txn.remove_room_state_events(&state_event_room_id, None)?;
1759
1760 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1761 txn.remove_room_members(&member_room_id, None)?;
1762
1763 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1764 txn.remove_room_profiles(&profile_room_id)?;
1765
1766 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1767 txn.remove_room_account_data(&room_account_data_room_id)?;
1768
1769 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1770 txn.remove_room_receipts(&receipt_room_id)?;
1771
1772 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1773 txn.remove_room_display_names(&display_name_room_id)?;
1774
1775 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1776 txn.remove_room_send_queue(&send_queue_room_id)?;
1777
1778 let dependent_send_queue_room_id =
1779 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1780 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1781
1782 Ok(())
1783 })
1784 .await?;
1785
1786 conn.vacuum().await
1787 }
1788
1789 async fn save_send_queue_request(
1790 &self,
1791 room_id: &RoomId,
1792 transaction_id: OwnedTransactionId,
1793 created_at: MilliSecondsSinceUnixEpoch,
1794 content: QueuedRequestKind,
1795 priority: usize,
1796 ) -> Result<(), Self::Error> {
1797 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1798 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1799
1800 let content = self.serialize_json(&content)?;
1801 let created_at_ts: u64 = created_at.0.into();
1807 self.acquire()
1808 .await?
1809 .with_transaction(move |txn| {
1810 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1811 Ok(())
1812 })
1813 .await
1814 }
1815
1816 async fn update_send_queue_request(
1817 &self,
1818 room_id: &RoomId,
1819 transaction_id: &TransactionId,
1820 content: QueuedRequestKind,
1821 ) -> Result<bool, Self::Error> {
1822 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1823
1824 let content = self.serialize_json(&content)?;
1825 let transaction_id = transaction_id.to_string();
1828
1829 let num_updated = self.acquire()
1830 .await?
1831 .with_transaction(move |txn| {
1832 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1833 })
1834 .await?;
1835
1836 Ok(num_updated > 0)
1837 }
1838
1839 async fn remove_send_queue_request(
1840 &self,
1841 room_id: &RoomId,
1842 transaction_id: &TransactionId,
1843 ) -> Result<bool, Self::Error> {
1844 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1845
1846 let transaction_id = transaction_id.to_string();
1848
1849 let num_deleted = self
1850 .acquire()
1851 .await?
1852 .with_transaction(move |txn| {
1853 txn.prepare_cached(
1854 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1855 )?
1856 .execute((room_id, &transaction_id))
1857 })
1858 .await?;
1859
1860 Ok(num_deleted > 0)
1861 }
1862
1863 async fn load_send_queue_requests(
1864 &self,
1865 room_id: &RoomId,
1866 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1867 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1868
1869 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1873 .acquire()
1874 .await?
1875 .prepare(
1876 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1877 |mut stmt| {
1878 stmt.query((room_id,))?
1879 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1880 .collect()
1881 },
1882 )
1883 .await?;
1884
1885 let mut requests = Vec::with_capacity(res.len());
1886 for entry in res {
1887 let created_at = entry
1888 .4
1889 .and_then(UInt::new)
1890 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1891 requests.push(QueuedRequest {
1892 transaction_id: entry.0.into(),
1893 kind: self.deserialize_json(&entry.1)?,
1894 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1895 priority: entry.3,
1896 created_at,
1897 });
1898 }
1899
1900 Ok(requests)
1901 }
1902
1903 async fn update_send_queue_request_status(
1904 &self,
1905 room_id: &RoomId,
1906 transaction_id: &TransactionId,
1907 error: Option<QueueWedgeError>,
1908 ) -> Result<(), Self::Error> {
1909 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1910
1911 let transaction_id = transaction_id.to_string();
1913
1914 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1916
1917 self.acquire()
1918 .await?
1919 .with_transaction(move |txn| {
1920 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1921 Ok(())
1922 })
1923 .await
1924 }
1925
1926 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1927 let res: Vec<Vec<u8>> = self
1932 .acquire()
1933 .await?
1934 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1935 stmt.query(())?.mapped(|row| row.get(0)).collect()
1936 })
1937 .await?;
1938
1939 Ok(res
1942 .into_iter()
1943 .map(|entry| self.deserialize_value(&entry))
1944 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1945 .into_iter()
1946 .collect())
1947 }
1948
1949 async fn save_dependent_queued_request(
1950 &self,
1951 room_id: &RoomId,
1952 parent_txn_id: &TransactionId,
1953 own_txn_id: ChildTransactionId,
1954 created_at: MilliSecondsSinceUnixEpoch,
1955 content: DependentQueuedRequestKind,
1956 ) -> Result<()> {
1957 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1958 let content = self.serialize_json(&content)?;
1959
1960 let parent_txn_id = parent_txn_id.to_string();
1962 let own_txn_id = own_txn_id.to_string();
1963
1964 let created_at_ts: u64 = created_at.0.into();
1965 self.acquire()
1966 .await?
1967 .with_transaction(move |txn| {
1968 txn.prepare_cached(
1969 r#"INSERT INTO dependent_send_queue_events
1970 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1971 VALUES (?, ?, ?, ?, ?)"#,
1972 )?
1973 .execute((
1974 room_id,
1975 parent_txn_id,
1976 own_txn_id,
1977 content,
1978 created_at_ts,
1979 ))?;
1980 Ok(())
1981 })
1982 .await
1983 }
1984
1985 async fn update_dependent_queued_request(
1986 &self,
1987 room_id: &RoomId,
1988 own_transaction_id: &ChildTransactionId,
1989 new_content: DependentQueuedRequestKind,
1990 ) -> Result<bool> {
1991 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1992 let content = self.serialize_json(&new_content)?;
1993
1994 let own_txn_id = own_transaction_id.to_string();
1996
1997 let num_updated = self
1998 .acquire()
1999 .await?
2000 .with_transaction(move |txn| {
2001 txn.prepare_cached(
2002 r#"UPDATE dependent_send_queue_events
2003 SET content = ?
2004 WHERE own_transaction_id = ?
2005 AND room_id = ?"#,
2006 )?
2007 .execute((content, own_txn_id, room_id))
2008 })
2009 .await?;
2010
2011 if num_updated > 1 {
2012 return Err(Error::InconsistentUpdate);
2013 }
2014
2015 Ok(num_updated == 1)
2016 }
2017
2018 async fn mark_dependent_queued_requests_as_ready(
2019 &self,
2020 room_id: &RoomId,
2021 parent_txn_id: &TransactionId,
2022 parent_key: SentRequestKey,
2023 ) -> Result<usize> {
2024 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2025 let parent_key = self.serialize_value(&parent_key)?;
2026
2027 let parent_txn_id = parent_txn_id.to_string();
2029
2030 self.acquire()
2031 .await?
2032 .with_transaction(move |txn| {
2033 Ok(txn.prepare_cached(
2034 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2035 )?
2036 .execute((parent_key, parent_txn_id, room_id))?)
2037 })
2038 .await
2039 }
2040
2041 async fn remove_dependent_queued_request(
2042 &self,
2043 room_id: &RoomId,
2044 txn_id: &ChildTransactionId,
2045 ) -> Result<bool> {
2046 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2047
2048 let txn_id = txn_id.to_string();
2050
2051 let num_deleted = self
2052 .acquire()
2053 .await?
2054 .with_transaction(move |txn| {
2055 txn.prepare_cached(
2056 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2057 )?
2058 .execute((txn_id, room_id))
2059 })
2060 .await?;
2061
2062 Ok(num_deleted > 0)
2063 }
2064
2065 async fn load_dependent_queued_requests(
2066 &self,
2067 room_id: &RoomId,
2068 ) -> Result<Vec<DependentQueuedRequest>> {
2069 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2070
2071 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2073 .acquire()
2074 .await?
2075 .prepare(
2076 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2077 |mut stmt| {
2078 stmt.query((room_id,))?
2079 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2080 .collect()
2081 },
2082 )
2083 .await?;
2084
2085 let mut dependent_events = Vec::with_capacity(res.len());
2086 for entry in res {
2087 let created_at = entry
2088 .4
2089 .and_then(UInt::new)
2090 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2091 dependent_events.push(DependentQueuedRequest {
2092 own_transaction_id: entry.0.into(),
2093 parent_transaction_id: entry.1.into(),
2094 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2095 kind: self.deserialize_json(&entry.3)?,
2096 created_at,
2097 });
2098 }
2099
2100 Ok(dependent_events)
2101 }
2102}
2103
2104#[derive(Debug, Clone, Serialize, Deserialize)]
2105struct ReceiptData {
2106 receipt: Receipt,
2107 event_id: OwnedEventId,
2108 user_id: OwnedUserId,
2109}
2110
2111#[cfg(test)]
2112mod tests {
2113 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2114
2115 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2116 use once_cell::sync::Lazy;
2117 use tempfile::{tempdir, TempDir};
2118
2119 use super::SqliteStateStore;
2120
2121 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2122 static NUM: AtomicU32 = AtomicU32::new(0);
2123
2124 async fn get_store() -> Result<impl StateStore, StoreError> {
2125 let name = NUM.fetch_add(1, SeqCst).to_string();
2126 let tmpdir_path = TMP_DIR.path().join(name);
2127
2128 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2129
2130 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2131 }
2132
2133 statestore_integration_tests!();
2134}
2135
2136#[cfg(test)]
2137mod encrypted_tests {
2138 use std::{
2139 path::PathBuf,
2140 sync::atomic::{AtomicU32, Ordering::SeqCst},
2141 };
2142
2143 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2144 use matrix_sdk_test::async_test;
2145 use once_cell::sync::Lazy;
2146 use tempfile::{tempdir, TempDir};
2147
2148 use super::SqliteStateStore;
2149 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2150
2151 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2152 static NUM: AtomicU32 = AtomicU32::new(0);
2153
2154 fn new_state_store_workspace() -> PathBuf {
2155 let name = NUM.fetch_add(1, SeqCst).to_string();
2156 TMP_DIR.path().join(name)
2157 }
2158
2159 async fn get_store() -> Result<impl StateStore, StoreError> {
2160 let tmpdir_path = new_state_store_workspace();
2161
2162 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2163
2164 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2165 .await
2166 .unwrap())
2167 }
2168
2169 #[async_test]
2170 async fn test_pool_size() {
2171 let tmpdir_path = new_state_store_workspace();
2172 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2173
2174 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2175
2176 assert_eq!(store.pool.status().max_size, 42);
2177 }
2178
2179 #[async_test]
2180 async fn test_cache_size() {
2181 let tmpdir_path = new_state_store_workspace();
2182 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2183
2184 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2185
2186 let conn = store.pool.get().await.unwrap();
2187 let cache_size =
2188 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2189
2190 assert_eq!(cache_size, -(1500 / 1024));
2194 }
2195
2196 #[async_test]
2197 async fn test_journal_size_limit() {
2198 let tmpdir_path = new_state_store_workspace();
2199 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2200
2201 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2202
2203 let conn = store.pool.get().await.unwrap();
2204 let journal_size_limit = conn
2205 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2206 .await
2207 .unwrap();
2208
2209 assert_eq!(journal_size_limit, 1500);
2212 }
2213
2214 statestore_integration_tests!();
2215}
2216
2217#[cfg(test)]
2218mod migration_tests {
2219 use std::{
2220 path::{Path, PathBuf},
2221 sync::{
2222 atomic::{AtomicU32, Ordering::SeqCst},
2223 Arc,
2224 },
2225 };
2226
2227 use deadpool_sqlite::Runtime;
2228 use matrix_sdk_base::{
2229 store::{ChildTransactionId, DependentQueuedRequestKind, SerializableEventContent},
2230 sync::UnreadNotificationsCount,
2231 RoomState, StateStore,
2232 };
2233 use matrix_sdk_test::async_test;
2234 use once_cell::sync::Lazy;
2235 use ruma::{
2236 events::{
2237 room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2238 StateEventType,
2239 },
2240 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2241 UserId,
2242 };
2243 use rusqlite::Transaction;
2244 use serde_json::json;
2245 use tempfile::{tempdir, TempDir};
2246 use tokio::fs;
2247
2248 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2249 use crate::{
2250 error::{Error, Result},
2251 utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2252 OpenStoreError,
2253 };
2254
2255 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2256 static NUM: AtomicU32 = AtomicU32::new(0);
2257 const SECRET: &str = "secret";
2258
2259 fn new_path() -> PathBuf {
2260 let name = NUM.fetch_add(1, SeqCst).to_string();
2261 TMP_DIR.path().join(name)
2262 }
2263
2264 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2265 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2266
2267 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2268 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2271 let conn = pool.get().await?;
2272
2273 init(&conn).await?;
2274
2275 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2276 let this = SqliteStateStore { store_cipher, pool };
2277 this.run_migrations(&conn, 1, Some(version)).await?;
2278
2279 Ok(this)
2280 }
2281
2282 fn room_info_v1_json(
2283 room_id: &RoomId,
2284 state: RoomState,
2285 name: Option<&str>,
2286 creator: Option<&UserId>,
2287 ) -> serde_json::Value {
2288 let name_content = match name {
2290 Some(name) => json!({ "name": name }),
2291 None => json!({ "name": null }),
2292 };
2293 let create_content = match creator {
2295 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2296 None => RoomCreateEventContent::new_v11(),
2297 };
2298
2299 json!({
2300 "room_id": room_id,
2301 "room_type": state,
2302 "notification_counts": UnreadNotificationsCount::default(),
2303 "summary": {
2304 "heroes": [],
2305 "joined_member_count": 0,
2306 "invited_member_count": 0,
2307 },
2308 "members_synced": false,
2309 "base_info": {
2310 "dm_targets": [],
2311 "max_power_level": 100,
2312 "name": {
2313 "Original": {
2314 "content": name_content,
2315 },
2316 },
2317 "create": {
2318 "Original": {
2319 "content": create_content,
2320 }
2321 }
2322 },
2323 })
2324 }
2325
2326 #[async_test]
2327 pub async fn test_migrating_v1_to_v2() {
2328 let path = new_path();
2329 {
2331 let db = create_fake_db(&path, 1).await.unwrap();
2332 let conn = db.pool.get().await.unwrap();
2333
2334 let this = db.clone();
2335 conn.with_transaction(move |txn| {
2336 for i in 0..5 {
2337 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2338 let (state, stripped) =
2339 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2340 let info = room_info_v1_json(&room_id, state, None, None);
2341
2342 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2343 let data = this.serialize_json(&info)?;
2344
2345 txn.prepare_cached(
2346 "INSERT INTO room_info (room_id, stripped, data)
2347 VALUES (?, ?, ?)",
2348 )?
2349 .execute((room_id, stripped, data))?;
2350 }
2351
2352 Result::<_, Error>::Ok(())
2353 })
2354 .await
2355 .unwrap();
2356 }
2357
2358 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2360
2361 assert_eq!(store.get_room_infos().await.unwrap().len(), 5);
2363 }
2364
2365 fn add_room_v2(
2367 this: &SqliteStateStore,
2368 txn: &Transaction<'_>,
2369 room_id: &RoomId,
2370 name: Option<&str>,
2371 create_creator: Option<&UserId>,
2372 create_sender: Option<&UserId>,
2373 ) -> Result<(), Error> {
2374 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2375
2376 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2377 let encoded_state =
2378 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2379 let data = this.serialize_json(&room_info_json)?;
2380
2381 txn.prepare_cached(
2382 "INSERT INTO room_info (room_id, state, data)
2383 VALUES (?, ?, ?)",
2384 )?
2385 .execute((encoded_room_id, encoded_state, data))?;
2386
2387 let Some(create_sender) = create_sender else {
2389 return Ok(());
2390 };
2391
2392 let create_content = match create_creator {
2393 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2394 None => RoomCreateEventContent::new_v11(),
2395 };
2396
2397 let event_id = EventId::new(server_name!("dummy.local"));
2398 let create_event = json!({
2399 "content": create_content,
2400 "event_id": event_id,
2401 "sender": create_sender.to_owned(),
2402 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2403 "state_key": "",
2404 "type": "m.room.create",
2405 "unsigned": {},
2406 });
2407
2408 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2409 let encoded_event_type =
2410 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2411 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2412 let stripped = false;
2413 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2414 let data = this.serialize_json(&create_event)?;
2415
2416 txn.prepare_cached(
2417 "INSERT
2418 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2419 VALUES (?, ?, ?, ?, ?, ?)",
2420 )?
2421 .execute((
2422 encoded_room_id,
2423 encoded_event_type,
2424 encoded_state_key,
2425 stripped,
2426 encoded_event_id,
2427 data,
2428 ))?;
2429
2430 Ok(())
2431 }
2432
2433 #[async_test]
2434 pub async fn test_migrating_v2_to_v3() {
2435 let path = new_path();
2436
2437 let room_a_id = room_id!("!room_a:dummy.local");
2439 let room_a_name = "Room A";
2440 let room_a_creator = user_id!("@creator:dummy.local");
2441 let room_a_create_sender = user_id!("@sender:dummy.local");
2444
2445 let room_b_id = room_id!("!room_b:dummy.local");
2447
2448 let room_c_id = room_id!("!room_c:dummy.local");
2450 let room_c_create_sender = user_id!("@creator:dummy.local");
2451
2452 {
2454 let db = create_fake_db(&path, 2).await.unwrap();
2455 let conn = db.pool.get().await.unwrap();
2456
2457 let this = db.clone();
2458 conn.with_transaction(move |txn| {
2459 add_room_v2(
2460 &this,
2461 txn,
2462 room_a_id,
2463 Some(room_a_name),
2464 Some(room_a_creator),
2465 Some(room_a_create_sender),
2466 )?;
2467 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2468 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2469
2470 Result::<_, Error>::Ok(())
2471 })
2472 .await
2473 .unwrap();
2474 }
2475
2476 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2478
2479 let room_infos = store.get_room_infos().await.unwrap();
2481 assert_eq!(room_infos.len(), 3);
2482
2483 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2484 assert_eq!(room_a.name(), Some(room_a_name));
2485 assert_eq!(room_a.creator(), Some(room_a_create_sender));
2486
2487 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2488 assert_eq!(room_b.name(), None);
2489 assert_eq!(room_b.creator(), None);
2490
2491 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2492 assert_eq!(room_c.name(), None);
2493 assert_eq!(room_c.creator(), Some(room_c_create_sender));
2494 }
2495
2496 #[async_test]
2497 pub async fn test_migrating_v7_to_v9() {
2498 let path = new_path();
2499
2500 let room_id = room_id!("!room_a:dummy.local");
2501 let wedged_event_transaction_id = TransactionId::new();
2502 let local_event_transaction_id = TransactionId::new();
2503
2504 {
2506 let db = create_fake_db(&path, 7).await.unwrap();
2507 let conn = db.pool.get().await.unwrap();
2508
2509 let wedge_tx = wedged_event_transaction_id.clone();
2510 let local_tx = local_event_transaction_id.clone();
2511
2512 conn.with_transaction(move |txn| {
2513 add_dependent_send_queue_event_v7(
2514 &db,
2515 txn,
2516 room_id,
2517 &local_tx,
2518 ChildTransactionId::new(),
2519 DependentQueuedRequestKind::RedactEvent,
2520 )?;
2521 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2522 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2523 Result::<_, Error>::Ok(())
2524 })
2525 .await
2526 .unwrap();
2527 }
2528
2529 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2532
2533 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2534 assert!(requests.is_empty());
2535
2536 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2537 assert!(dependent_requests.is_empty());
2538 }
2539
2540 fn add_send_queue_event_v7(
2541 this: &SqliteStateStore,
2542 txn: &Transaction<'_>,
2543 transaction_id: &TransactionId,
2544 room_id: &RoomId,
2545 is_wedged: bool,
2546 ) -> Result<(), Error> {
2547 let content =
2548 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2549
2550 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2551 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2552
2553 let content = this.serialize_json(&content)?;
2554
2555 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2556 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2557
2558 Ok(())
2559 }
2560
2561 fn add_dependent_send_queue_event_v7(
2562 this: &SqliteStateStore,
2563 txn: &Transaction<'_>,
2564 room_id: &RoomId,
2565 parent_txn_id: &TransactionId,
2566 own_txn_id: ChildTransactionId,
2567 content: DependentQueuedRequestKind,
2568 ) -> Result<(), Error> {
2569 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2570
2571 let parent_txn_id = parent_txn_id.to_string();
2572 let own_txn_id = own_txn_id.to_string();
2573 let content = this.serialize_json(&content)?;
2574
2575 txn.prepare_cached(
2576 "INSERT INTO dependent_send_queue_events
2577 (room_id, parent_transaction_id, own_transaction_id, content)
2578 VALUES (?, ?, ?, ?)",
2579 )?
2580 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2581
2582 Ok(())
2583 }
2584}