diff --git a/crates/store_sqlite/src/addressbook_store.rs b/crates/store_sqlite/src/addressbook_store.rs index 0452f6e..a7f4b87 100644 --- a/crates/store_sqlite/src/addressbook_store.rs +++ b/crates/store_sqlite/src/addressbook_store.rs @@ -5,9 +5,9 @@ use rustical_store::{ synctoken::format_synctoken, AddressObject, Addressbook, AddressbookStore, CollectionOperation, CollectionOperationDomain, CollectionOperationType, Error, }; -use sqlx::{Sqlite, SqlitePool, Transaction}; +use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction}; use tokio::sync::mpsc::Sender; -use tracing::instrument; +use tracing::{error, instrument}; #[derive(Debug, Clone)] struct AddressObjectRow { @@ -23,7 +23,560 @@ impl TryFrom for AddressObject { } } -// Logs an operation to the events +#[derive(Debug, Constructor)] +pub struct SqliteAddressbookStore { + db: SqlitePool, + sender: Sender, +} + +impl SqliteAddressbookStore { + async fn _get_addressbook<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + id: &str, + ) -> Result { + let addressbook = sqlx::query_as!( + Addressbook, + r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic + FROM addressbooks + WHERE (principal, id) = (?, ?)"#, + principal, + id + ) + .fetch_one(executor) + .await + .map_err(crate::Error::from)?; + Ok(addressbook) + } + + async fn _get_addressbooks<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + ) -> Result, rustical_store::Error> { + let addressbooks = sqlx::query_as!( + Addressbook, + r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic + FROM addressbooks + WHERE principal = ? AND deleted_at IS NULL"#, + principal + ) + .fetch_all(executor) + .await + .map_err(crate::Error::from)?; + Ok(addressbooks) + } + + async fn _get_deleted_addressbooks<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + ) -> Result, rustical_store::Error> { + let addressbooks = sqlx::query_as!( + Addressbook, + r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic + FROM addressbooks + WHERE principal = ? AND deleted_at IS NOT NULL"#, + principal + ) + .fetch_all(executor) + .await + .map_err(crate::Error::from)?; + Ok(addressbooks) + } + + async fn _update_addressbook<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: String, + id: String, + addressbook: Addressbook, + ) -> Result<(), rustical_store::Error> { + let result = sqlx::query!( + r#"UPDATE addressbooks SET principal = ?, id = ?, displayname = ?, description = ?, push_topic = ? + WHERE (principal, id) = (?, ?)"#, + addressbook.principal, + addressbook.id, + addressbook.displayname, + addressbook.description, + addressbook.push_topic, + principal, + id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + if result.rows_affected() == 0 { + return Err(rustical_store::Error::NotFound); + } + Ok(()) + } + + async fn _insert_addressbook<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + addressbook: Addressbook, + ) -> Result<(), rustical_store::Error> { + sqlx::query!( + r#"INSERT INTO addressbooks (principal, id, displayname, description, push_topic) + VALUES (?, ?, ?, ?, ?)"#, + addressbook.principal, + addressbook.id, + addressbook.displayname, + addressbook.description, + addressbook.push_topic, + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + Ok(()) + } + + async fn _delete_addressbook<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + use_trashbin: bool, + ) -> Result<(), rustical_store::Error> { + match use_trashbin { + true => { + sqlx::query!( + r#"UPDATE addressbooks SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#, + principal, addressbook_id + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + } + false => { + sqlx::query!( + r#"DELETE FROM addressbooks WHERE (principal, id) = (?, ?)"#, + principal, + addressbook_id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + } + }; + + Ok(()) + } + + async fn _restore_addressbook<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + ) -> Result<(), rustical_store::Error> { + sqlx::query!( + r"UPDATE addressbooks SET deleted_at = NULL WHERE (principal, id) = (?, ?)", + principal, + addressbook_id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + Ok(()) + } + + async fn _sync_changes<'a, A: Acquire<'a, Database = Sqlite>>( + acquire: A, + principal: &str, + addressbook_id: &str, + synctoken: i64, + ) -> Result<(Vec, Vec, i64), rustical_store::Error> { + struct Row { + object_id: String, + synctoken: i64, + } + + let mut conn = acquire.acquire().await.map_err(crate::Error::from)?; + + let changes = sqlx::query_as!( + Row, + r#" + SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from addressobjectchangelog + WHERE synctoken > ? + ORDER BY synctoken ASC + "#, + synctoken + ) + .fetch_all(&mut *conn) + .await.map_err(crate::Error::from)?; + + let mut objects = vec![]; + let mut deleted_objects = vec![]; + + let new_synctoken = changes + .last() + .map(|&Row { synctoken, .. }| synctoken) + .unwrap_or(0); + + for Row { object_id, .. } in changes { + match Self::_get_object(&mut *conn, principal, addressbook_id, &object_id).await { + Ok(object) => objects.push(object), + Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id), + Err(err) => return Err(err), + } + } + + Ok((objects, deleted_objects, new_synctoken)) + } + + async fn _get_objects<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + ) -> Result, rustical_store::Error> { + sqlx::query_as!( + AddressObjectRow, + "SELECT id, vcf FROM addressobjects WHERE principal = ? AND addressbook_id = ? AND deleted_at IS NULL", + principal, + addressbook_id + ) + .fetch_all(executor) + .await.map_err(crate::Error::from)? + .into_iter() + .map(|row| row.try_into().map_err(rustical_store::Error::from)) + .collect() + } + + async fn _get_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + object_id: &str, + ) -> Result { + Ok(sqlx::query_as!( + AddressObjectRow, + "SELECT id, vcf FROM addressobjects WHERE (principal, addressbook_id, id) = (?, ?, ?)", + principal, + addressbook_id, + object_id + ) + .fetch_one(executor) + .await + .map_err(crate::Error::from)? + .try_into()?) + } + + async fn _put_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: String, + addressbook_id: String, + object: AddressObject, + overwrite: bool, + ) -> Result<(), rustical_store::Error> { + let (object_id, vcf) = (object.get_id(), object.get_vcf()); + + (if overwrite { + sqlx::query!( + "REPLACE INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", + principal, + addressbook_id, + object_id, + vcf + ) + } else { + // If the object already exists a database error is thrown and handled in error.rs + sqlx::query!( + "INSERT INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", + principal, + addressbook_id, + object_id, + vcf + ) + }) + .execute(executor) + .await + .map_err(crate::Error::from)?; + + Ok(()) + } + + async fn _delete_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + object_id: &str, + use_trashbin: bool, + ) -> Result<(), rustical_store::Error> { + match use_trashbin { + true => { + sqlx::query!( + "UPDATE addressobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)", + principal, + addressbook_id, + object_id + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + } + false => { + sqlx::query!( + "DELETE FROM addressobjects WHERE addressbook_id = ? AND id = ?", + addressbook_id, + object_id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + } + }; + Ok(()) + } + + async fn _restore_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + addressbook_id: &str, + object_id: &str, + ) -> Result<(), rustical_store::Error> { + sqlx::query!( + r#"UPDATE addressobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)"#, + principal, + addressbook_id, + object_id + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + Ok(()) + } +} + +#[async_trait] +impl AddressbookStore for SqliteAddressbookStore { + #[instrument] + async fn get_addressbook( + &self, + principal: &str, + id: &str, + ) -> Result { + Self::_get_addressbook(&self.db, principal, id).await + } + + #[instrument] + async fn get_addressbooks( + &self, + principal: &str, + ) -> Result, rustical_store::Error> { + Self::_get_addressbooks(&self.db, principal).await + } + + #[instrument] + async fn get_deleted_addressbooks( + &self, + principal: &str, + ) -> Result, rustical_store::Error> { + Self::_get_deleted_addressbooks(&self.db, principal).await + } + + #[instrument] + async fn update_addressbook( + &self, + principal: String, + id: String, + addressbook: Addressbook, + ) -> Result<(), rustical_store::Error> { + Self::_update_addressbook(&self.db, principal, id, addressbook).await + } + + #[instrument] + async fn insert_addressbook( + &self, + addressbook: Addressbook, + ) -> Result<(), rustical_store::Error> { + Self::_insert_addressbook(&self.db, addressbook).await + } + + #[instrument] + async fn delete_addressbook( + &self, + principal: &str, + addressbook_id: &str, + use_trashbin: bool, + ) -> Result<(), rustical_store::Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + let addressbook = match Self::_get_addressbook(&mut *tx, principal, addressbook_id).await { + Ok(addressbook) => Some(addressbook), + Err(Error::NotFound) => None, + Err(err) => return Err(err), + }; + + Self::_delete_addressbook(&mut *tx, principal, addressbook_id, use_trashbin).await?; + tx.commit().await.map_err(crate::Error::from)?; + + if let Some(addressbook) = addressbook { + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Delete, + domain: CollectionOperationDomain::Addressbook, + topic: addressbook.push_topic, + sync_token: None, + }) { + error!("Push notification about deleted addressbook failed: {err}"); + }; + } + + Ok(()) + } + + #[instrument] + async fn restore_addressbook( + &self, + principal: &str, + addressbook_id: &str, + ) -> Result<(), rustical_store::Error> { + Self::_restore_addressbook(&self.db, principal, addressbook_id).await + } + + #[instrument] + async fn sync_changes( + &self, + principal: &str, + addressbook_id: &str, + synctoken: i64, + ) -> Result<(Vec, Vec, i64), rustical_store::Error> { + Self::_sync_changes(&self.db, principal, addressbook_id, synctoken).await + } + + #[instrument] + async fn get_objects( + &self, + principal: &str, + addressbook_id: &str, + ) -> Result, rustical_store::Error> { + Self::_get_objects(&self.db, principal, addressbook_id).await + } + + #[instrument] + async fn get_object( + &self, + principal: &str, + addressbook_id: &str, + object_id: &str, + ) -> Result { + Self::_get_object(&self.db, principal, addressbook_id, object_id).await + } + + #[instrument] + async fn put_object( + &self, + principal: String, + addressbook_id: String, + object: AddressObject, + overwrite: bool, + ) -> Result<(), rustical_store::Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + let object_id = object.get_id().to_owned(); + + Self::_put_object( + &mut *tx, + principal.to_owned(), + addressbook_id.to_owned(), + object, + overwrite, + ) + .await?; + + let synctoken = log_object_operation( + &mut tx, + &principal, + &addressbook_id, + &object_id, + ChangeOperation::Add, + ) + .await + .map_err(crate::Error::from)?; + + tx.commit().await.map_err(crate::Error::from)?; + + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: CollectionOperationDomain::Addressbook, + topic: self + .get_addressbook(&principal, &addressbook_id) + .await? + .push_topic, + sync_token: Some(synctoken), + }) { + error!("Push notification about deleted addressbook failed: {err}"); + }; + + Ok(()) + } + + #[instrument] + async fn delete_object( + &self, + principal: &str, + addressbook_id: &str, + object_id: &str, + use_trashbin: bool, + ) -> Result<(), rustical_store::Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + Self::_delete_object(&mut *tx, principal, addressbook_id, object_id, use_trashbin).await?; + + let synctoken = log_object_operation( + &mut tx, + principal, + addressbook_id, + object_id, + ChangeOperation::Delete, + ) + .await + .map_err(crate::Error::from)?; + + tx.commit().await.map_err(crate::Error::from)?; + + // TODO: Watch for errors here? + let _ = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: CollectionOperationDomain::Addressbook, + topic: self + .get_addressbook(principal, addressbook_id) + .await? + .push_topic, + sync_token: Some(synctoken), + }); + Ok(()) + } + + #[instrument] + async fn restore_object( + &self, + principal: &str, + addressbook_id: &str, + object_id: &str, + ) -> Result<(), rustical_store::Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + Self::_restore_object(&mut *tx, principal, addressbook_id, object_id).await?; + + let synctoken = log_object_operation( + &mut tx, + principal, + addressbook_id, + object_id, + ChangeOperation::Add, + ) + .await + .map_err(crate::Error::from)?; + tx.commit().await.map_err(crate::Error::from)?; + + // TODO: Watch for errors here? + let _ = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: CollectionOperationDomain::Addressbook, + topic: self + .get_addressbook(principal, addressbook_id) + .await? + .push_topic, + sync_token: Some(synctoken), + }); + + Ok(()) + } +} + +// Logs an operation to an address object async fn log_object_operation( tx: &mut Transaction<'_, Sqlite>, principal: &str, @@ -62,420 +615,3 @@ async fn log_object_operation( .await?; Ok(format_synctoken(synctoken)) } - -#[derive(Debug, Constructor)] -pub struct SqliteAddressbookStore { - db: SqlitePool, - sender: Sender, -} - -#[async_trait] -impl AddressbookStore for SqliteAddressbookStore { - #[instrument] - async fn get_addressbook( - &self, - principal: &str, - id: &str, - ) -> Result { - let addressbook = sqlx::query_as!( - Addressbook, - r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic - FROM addressbooks - WHERE (principal, id) = (?, ?)"#, - principal, - id - ) - .fetch_one(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(addressbook) - } - - #[instrument] - async fn get_addressbooks( - &self, - principal: &str, - ) -> Result, rustical_store::Error> { - let addressbooks = sqlx::query_as!( - Addressbook, - r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic - FROM addressbooks - WHERE principal = ? AND deleted_at IS NULL"#, - principal - ) - .fetch_all(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(addressbooks) - } - - #[instrument] - async fn get_deleted_addressbooks( - &self, - principal: &str, - ) -> Result, rustical_store::Error> { - let addressbooks = sqlx::query_as!( - Addressbook, - r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic - FROM addressbooks - WHERE principal = ? AND deleted_at IS NOT NULL"#, - principal - ) - .fetch_all(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(addressbooks) - } - - #[instrument] - async fn update_addressbook( - &self, - principal: String, - id: String, - addressbook: Addressbook, - ) -> Result<(), rustical_store::Error> { - let result = sqlx::query!( - r#"UPDATE addressbooks SET principal = ?, id = ?, displayname = ?, description = ?, push_topic = ? - WHERE (principal, id) = (?, ?)"#, - addressbook.principal, - addressbook.id, - addressbook.displayname, - addressbook.description, - addressbook.push_topic, - principal, - id - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - if result.rows_affected() == 0 { - return Err(rustical_store::Error::NotFound); - } - Ok(()) - } - - #[instrument] - async fn insert_addressbook( - &self, - addressbook: Addressbook, - ) -> Result<(), rustical_store::Error> { - sqlx::query!( - r#"INSERT INTO addressbooks (principal, id, displayname, description, push_topic) - VALUES (?, ?, ?, ?, ?)"#, - addressbook.principal, - addressbook.id, - addressbook.displayname, - addressbook.description, - addressbook.push_topic, - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(()) - } - - #[instrument] - async fn delete_addressbook( - &self, - principal: &str, - addressbook_id: &str, - use_trashbin: bool, - ) -> Result<(), rustical_store::Error> { - let addressbook = match self.get_addressbook(principal, addressbook_id).await { - Ok(addressbook) => Some(addressbook), - Err(Error::NotFound) => None, - Err(err) => return Err(err), - }; - - match use_trashbin { - true => { - sqlx::query!( - r#"UPDATE addressbooks SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#, - principal, addressbook_id - ) - .execute(&self.db) - .await.map_err(crate::Error::from)?; - } - false => { - sqlx::query!( - r#"DELETE FROM addressbooks WHERE (principal, id) = (?, ?)"#, - principal, - addressbook_id - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - } - }; - - if let Some(addressbook) = addressbook { - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Delete, - domain: CollectionOperationDomain::Addressbook, - topic: addressbook.push_topic, - sync_token: None, - }); - } - - Ok(()) - } - - #[instrument] - async fn restore_addressbook( - &self, - principal: &str, - addressbook_id: &str, - ) -> Result<(), rustical_store::Error> { - sqlx::query!( - r"UPDATE addressbooks SET deleted_at = NULL WHERE (principal, id) = (?, ?)", - principal, - addressbook_id - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(()) - } - - #[instrument] - async fn sync_changes( - &self, - principal: &str, - addressbook_id: &str, - synctoken: i64, - ) -> Result<(Vec, Vec, i64), rustical_store::Error> { - struct Row { - object_id: String, - synctoken: i64, - } - let changes = sqlx::query_as!( - Row, - r#" - SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from addressobjectchangelog - WHERE synctoken > ? - ORDER BY synctoken ASC - "#, - synctoken - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)?; - - let mut objects = vec![]; - let mut deleted_objects = vec![]; - - let new_synctoken = changes - .last() - .map(|&Row { synctoken, .. }| synctoken) - .unwrap_or(0); - - for Row { object_id, .. } in changes { - match self.get_object(principal, addressbook_id, &object_id).await { - Ok(object) => objects.push(object), - Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id), - Err(err) => return Err(err), - } - } - - Ok((objects, deleted_objects, new_synctoken)) - } - - #[instrument] - async fn get_objects( - &self, - principal: &str, - addressbook_id: &str, - ) -> Result, rustical_store::Error> { - sqlx::query_as!( - AddressObjectRow, - "SELECT id, vcf FROM addressobjects WHERE principal = ? AND addressbook_id = ? AND deleted_at IS NULL", - principal, - addressbook_id - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)? - .into_iter() - .map(|row| row.try_into().map_err(rustical_store::Error::from)) - .collect() - } - - #[instrument] - async fn get_object( - &self, - principal: &str, - addressbook_id: &str, - object_id: &str, - ) -> Result { - Ok(sqlx::query_as!( - AddressObjectRow, - "SELECT id, vcf FROM addressobjects WHERE (principal, addressbook_id, id) = (?, ?, ?)", - principal, - addressbook_id, - object_id - ) - .fetch_one(&self.db) - .await - .map_err(crate::Error::from)? - .try_into()?) - } - - #[instrument] - async fn put_object( - &self, - principal: String, - addressbook_id: String, - object: AddressObject, - overwrite: bool, - ) -> Result<(), rustical_store::Error> { - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - let (object_id, vcf) = (object.get_id(), object.get_vcf()); - - (if overwrite { - sqlx::query!( - "REPLACE INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", - principal, - addressbook_id, - object_id, - vcf - ) - } else { - // If the object already exists a database error is thrown and handled in error.rs - sqlx::query!( - "INSERT INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", - principal, - addressbook_id, - object_id, - vcf - ) - }) - .execute(&mut *tx) - .await - .map_err(crate::Error::from)?; - - let synctoken = log_object_operation( - &mut tx, - &principal, - &addressbook_id, - object_id, - ChangeOperation::Add, - ) - .await - .map_err(crate::Error::from)?; - - tx.commit().await.map_err(crate::Error::from)?; - - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: CollectionOperationDomain::Addressbook, - topic: self - .get_addressbook(&principal, &addressbook_id) - .await? - .push_topic, - sync_token: Some(synctoken), - }); - - Ok(()) - } - - #[instrument] - async fn delete_object( - &self, - principal: &str, - addressbook_id: &str, - object_id: &str, - use_trashbin: bool, - ) -> Result<(), rustical_store::Error> { - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - match use_trashbin { - true => { - sqlx::query!( - "UPDATE addressobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)", - principal, - addressbook_id, - object_id - ) - .execute(&mut *tx) - .await.map_err(crate::Error::from)?; - } - false => { - sqlx::query!( - "DELETE FROM addressobjects WHERE addressbook_id = ? AND id = ?", - addressbook_id, - object_id - ) - .execute(&mut *tx) - .await - .map_err(crate::Error::from)?; - } - }; - let synctoken = log_object_operation( - &mut tx, - principal, - addressbook_id, - object_id, - ChangeOperation::Delete, - ) - .await - .map_err(crate::Error::from)?; - - tx.commit().await.map_err(crate::Error::from)?; - - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: CollectionOperationDomain::Addressbook, - topic: self - .get_addressbook(principal, addressbook_id) - .await? - .push_topic, - sync_token: Some(synctoken), - }); - Ok(()) - } - - #[instrument] - async fn restore_object( - &self, - principal: &str, - addressbook_id: &str, - object_id: &str, - ) -> Result<(), rustical_store::Error> { - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - sqlx::query!( - r#"UPDATE addressobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)"#, - principal, - addressbook_id, - object_id - ) - .execute(&mut *tx) - .await.map_err(crate::Error::from)?; - - let synctoken = log_object_operation( - &mut tx, - principal, - addressbook_id, - object_id, - ChangeOperation::Add, - ) - .await - .map_err(crate::Error::from)?; - tx.commit().await.map_err(crate::Error::from)?; - - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: CollectionOperationDomain::Addressbook, - topic: self - .get_addressbook(principal, addressbook_id) - .await? - .push_topic, - sync_token: Some(synctoken), - }); - - Ok(()) - } -} diff --git a/crates/store_sqlite/src/calendar_store.rs b/crates/store_sqlite/src/calendar_store.rs index c3ca341..31fb53e 100644 --- a/crates/store_sqlite/src/calendar_store.rs +++ b/crates/store_sqlite/src/calendar_store.rs @@ -5,11 +5,9 @@ use rustical_store::calendar::CalDateTime; use rustical_store::synctoken::format_synctoken; use rustical_store::{Calendar, CalendarObject, CalendarStore, Error}; use rustical_store::{CollectionOperation, CollectionOperationType}; -use sqlx::Sqlite; -use sqlx::SqlitePool; -use sqlx::Transaction; +use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction}; use tokio::sync::mpsc::Sender; -use tracing::instrument; +use tracing::{error, instrument}; #[derive(Debug, Clone)] struct CalendarObjectRow { @@ -25,6 +23,548 @@ impl TryFrom for CalendarObject { } } +#[derive(Debug, Constructor)] +pub struct SqliteCalendarStore { + db: SqlitePool, + sender: Sender, +} + +impl SqliteCalendarStore { + async fn _get_calendar<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + id: &str, + ) -> Result { + let cal = sqlx::query_as!( + Calendar, + r#"SELECT principal, id, synctoken, "order", displayname, description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic + FROM calendars + WHERE (principal, id) = (?, ?)"#, + principal, + id + ) + .fetch_one(executor) + .await.map_err(crate::Error::from)?; + Ok(cal) + } + + async fn _get_calendars<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + ) -> Result, Error> { + let cals = sqlx::query_as!( + Calendar, + r#"SELECT principal, id, synctoken, displayname, "order", description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic + FROM calendars + WHERE principal = ? AND deleted_at IS NULL"#, + principal + ) + .fetch_all(executor) + .await.map_err(crate::Error::from)?; + Ok(cals) + } + + async fn _get_deleted_calendars<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + ) -> Result, Error> { + let cals = sqlx::query_as!( + Calendar, + r#"SELECT principal, id, synctoken, displayname, "order", description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic + FROM calendars + WHERE principal = ? AND deleted_at IS NOT NULL"#, + principal + ) + .fetch_all(executor) + .await.map_err(crate::Error::from)?; + Ok(cals) + } + + async fn _insert_calendar<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + calendar: Calendar, + ) -> Result<(), Error> { + sqlx::query!( + r#"INSERT INTO calendars (principal, id, displayname, description, "order", color, timezone, timezone_id, push_topic) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#, + calendar.principal, + calendar.id, + calendar.displayname, + calendar.description, + calendar.order, + calendar.color, + calendar.timezone, + calendar.timezone_id, + calendar.push_topic, + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + Ok(()) + } + + async fn _update_calendar<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: String, + id: String, + calendar: Calendar, + ) -> Result<(), Error> { + let result = sqlx::query!( + r#"UPDATE calendars SET principal = ?, id = ?, displayname = ?, description = ?, "order" = ?, color = ?, timezone = ?, timezone_id = ?, push_topic = ? + WHERE (principal, id) = (?, ?)"#, + calendar.principal, + calendar.id, + calendar.displayname, + calendar.description, + calendar.order, + calendar.color, + calendar.timezone, + calendar.timezone_id, + calendar.push_topic, + principal, + id + ).execute(executor).await.map_err(crate::Error::from)?; + if result.rows_affected() == 0 { + return Err(rustical_store::Error::NotFound); + } + Ok(()) + } + + async fn _delete_calendar<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + id: &str, + use_trashbin: bool, + ) -> Result<(), Error> { + match use_trashbin { + true => sqlx::query!( + r#"UPDATE calendars SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#, + principal, + id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?, + false => sqlx::query!( + r#"DELETE FROM calendars WHERE (principal, id) = (?, ?)"#, + principal, + id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?, + }; + Ok(()) + } + + async fn _restore_calendar<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + id: &str, + ) -> Result<(), Error> { + sqlx::query!( + r"UPDATE calendars SET deleted_at = NULL WHERE (principal, id) = (?, ?)", + principal, + id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + Ok(()) + } + + async fn _get_objects<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + cal_id: &str, + ) -> Result, Error> { + sqlx::query_as!( + CalendarObjectRow, + "SELECT id, ics FROM calendarobjects WHERE principal = ? AND cal_id = ? AND deleted_at IS NULL", + principal, + cal_id + ) + .fetch_all(executor) + .await.map_err(crate::Error::from)? + .into_iter() + .map(|row| row.try_into().map_err(rustical_store::Error::from)) + .collect() + } + + async fn _get_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + cal_id: &str, + object_id: &str, + ) -> Result { + sqlx::query_as!( + CalendarObjectRow, + "SELECT id, ics FROM calendarobjects WHERE (principal, cal_id, id) = (?, ?, ?)", + principal, + cal_id, + object_id + ) + .fetch_one(executor) + .await + .map_err(crate::Error::from)? + .try_into() + } + + #[instrument] + async fn _put_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: String, + cal_id: String, + object: CalendarObject, + overwrite: bool, + ) -> Result<(), Error> { + // TODO: Prevent objects from being commited to a subscription calendar + let (object_id, ics) = (object.get_id(), object.get_ics()); + + let first_occurence = object + .get_first_occurence() + .ok() + .flatten() + .as_ref() + .map(CalDateTime::date); + let last_occurence = object + .get_last_occurence() + .ok() + .flatten() + .as_ref() + .map(CalDateTime::date); + let etag = object.get_etag(); + let object_type = object.get_object_type() as u8; + + (if overwrite { + sqlx::query!( + "REPLACE INTO calendarobjects (principal, cal_id, id, ics, first_occurence, last_occurence, etag, object_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + principal, + cal_id, + object_id, + ics, + first_occurence, + last_occurence, + etag, + object_type, + ) + } else { + // If the object already exists a database error is thrown and handled in error.rs + sqlx::query!( + "INSERT INTO calendarobjects (principal, cal_id, id, ics, first_occurence, last_occurence, etag, object_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + principal, + cal_id, + object_id, + ics, + first_occurence, + last_occurence, + etag, + object_type, + ) + }) + .execute(executor) + .await + .map_err(crate::Error::from)?; + + Ok(()) + } + + #[instrument] + async fn _delete_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + cal_id: &str, + id: &str, + use_trashbin: bool, + ) -> Result<(), Error> { + match use_trashbin { + true => { + sqlx::query!( + "UPDATE calendarobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)", + principal, + cal_id, + id + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + } + false => { + sqlx::query!( + "DELETE FROM calendarobjects WHERE cal_id = ? AND id = ?", + cal_id, + id + ) + .execute(executor) + .await + .map_err(crate::Error::from)?; + } + }; + Ok(()) + } + + #[instrument] + async fn _restore_object<'e, E: Executor<'e, Database = Sqlite>>( + executor: E, + principal: &str, + cal_id: &str, + object_id: &str, + ) -> Result<(), Error> { + sqlx::query!( + r#"UPDATE calendarobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)"#, + principal, + cal_id, + object_id + ) + .execute(executor) + .await.map_err(crate::Error::from)?; + Ok(()) + } + + async fn _sync_changes<'a, A: Acquire<'a, Database = Sqlite>>( + acquire: A, + principal: &str, + cal_id: &str, + synctoken: i64, + ) -> Result<(Vec, Vec, i64), Error> { + struct Row { + object_id: String, + synctoken: i64, + } + + let mut conn = acquire.acquire().await.map_err(crate::Error::from)?; + + let changes = sqlx::query_as!( + Row, + r#" + SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from calendarobjectchangelog + WHERE synctoken > ? + ORDER BY synctoken ASC + "#, + synctoken + ) + .fetch_all(&mut *conn) + .await.map_err(crate::Error::from)?; + + let mut objects = vec![]; + let mut deleted_objects = vec![]; + + let new_synctoken = changes + .last() + .map(|&Row { synctoken, .. }| synctoken) + .unwrap_or(0); + + for Row { object_id, .. } in changes { + match Self::_get_object(&mut *conn, principal, cal_id, &object_id).await { + Ok(object) => objects.push(object), + Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id), + Err(err) => return Err(err), + } + } + + Ok((objects, deleted_objects, new_synctoken)) + } +} + +#[async_trait] +impl CalendarStore for SqliteCalendarStore { + #[instrument] + async fn get_calendar(&self, principal: &str, id: &str) -> Result { + Self::_get_calendar(&self.db, principal, id).await + } + + #[instrument] + async fn get_calendars(&self, principal: &str) -> Result, Error> { + Self::_get_calendars(&self.db, principal).await + } + + #[instrument] + async fn get_deleted_calendars(&self, principal: &str) -> Result, Error> { + Self::_get_deleted_calendars(&self.db, principal).await + } + + #[instrument] + async fn insert_calendar(&self, calendar: Calendar) -> Result<(), Error> { + Self::_insert_calendar(&self.db, calendar).await + } + + #[instrument] + async fn update_calendar( + &self, + principal: String, + id: String, + calendar: Calendar, + ) -> Result<(), Error> { + Self::_update_calendar(&self.db, principal, id, calendar).await + } + + // Does not actually delete the calendar but just disables it + #[instrument] + async fn delete_calendar( + &self, + principal: &str, + id: &str, + use_trashbin: bool, + ) -> Result<(), Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + let cal = match Self::_get_calendar(&mut *tx, principal, id).await { + Ok(cal) => Some(cal), + Err(Error::NotFound) => None, + Err(err) => return Err(err), + }; + + Self::_delete_calendar(&mut *tx, principal, id, use_trashbin).await?; + tx.commit().await.map_err(crate::Error::from)?; + + if let Some(cal) = cal { + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Delete, + domain: rustical_store::CollectionOperationDomain::Calendar, + topic: cal.push_topic, + sync_token: None, + }) { + error!("Push notification about deleted calendar failed: {err}"); + }; + } + Ok(()) + } + + #[instrument] + async fn restore_calendar(&self, principal: &str, id: &str) -> Result<(), Error> { + Self::_restore_calendar(&self.db, principal, id).await + } + + #[instrument] + async fn get_objects( + &self, + principal: &str, + cal_id: &str, + ) -> Result, Error> { + Self::_get_objects(&self.db, principal, cal_id).await + } + + #[instrument] + async fn get_object( + &self, + principal: &str, + cal_id: &str, + object_id: &str, + ) -> Result { + Self::_get_object(&self.db, principal, cal_id, object_id).await + } + + #[instrument] + async fn put_object( + &self, + principal: String, + cal_id: String, + object: CalendarObject, + overwrite: bool, + ) -> Result<(), Error> { + // TODO: Prevent objects from being commited to a subscription calendar + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + let object_id = object.get_id().to_owned(); + + Self::_put_object( + &mut *tx, + principal.to_owned(), + cal_id.to_owned(), + object, + overwrite, + ) + .await?; + + let synctoken = log_object_operation( + &mut tx, + &principal, + &cal_id, + &object_id, + ChangeOperation::Add, + ) + .await?; + + tx.commit().await.map_err(crate::Error::from)?; + + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: rustical_store::CollectionOperationDomain::Calendar, + topic: self.get_calendar(&principal, &cal_id).await?.push_topic, + sync_token: Some(synctoken), + }) { + error!("Push notification about deleted calendar failed: {err}"); + }; + Ok(()) + } + + #[instrument] + async fn delete_object( + &self, + principal: &str, + cal_id: &str, + id: &str, + use_trashbin: bool, + ) -> Result<(), Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + Self::_delete_object(&mut *tx, principal, cal_id, id, use_trashbin).await?; + + let synctoken = + log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?; + tx.commit().await.map_err(crate::Error::from)?; + + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: rustical_store::CollectionOperationDomain::Calendar, + topic: self.get_calendar(principal, cal_id).await?.push_topic, + sync_token: Some(synctoken), + }) { + error!("Push notification about deleted calendar failed: {err}"); + }; + Ok(()) + } + + #[instrument] + async fn restore_object( + &self, + principal: &str, + cal_id: &str, + object_id: &str, + ) -> Result<(), Error> { + let mut tx = self.db.begin().await.map_err(crate::Error::from)?; + + Self::_restore_object(&mut *tx, principal, cal_id, object_id).await?; + + let synctoken = + log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add) + .await?; + tx.commit().await.map_err(crate::Error::from)?; + + if let Err(err) = self.sender.try_send(CollectionOperation { + r#type: CollectionOperationType::Object, + domain: rustical_store::CollectionOperationDomain::Calendar, + topic: self.get_calendar(principal, cal_id).await?.push_topic, + sync_token: Some(synctoken), + }) { + error!("Push notification about deleted calendar failed: {err}"); + }; + Ok(()) + } + + #[instrument] + async fn sync_changes( + &self, + principal: &str, + cal_id: &str, + synctoken: i64, + ) -> Result<(Vec, Vec, i64), Error> { + Self::_sync_changes(&self.db, principal, cal_id, synctoken).await + } + + fn is_read_only(&self) -> bool { + false + } +} + // Logs an operation to the events async fn log_object_operation( tx: &mut Transaction<'_, Sqlite>, @@ -66,404 +606,3 @@ async fn log_object_operation( .map_err(crate::Error::from)?; Ok(format_synctoken(synctoken)) } - -#[derive(Debug, Constructor)] -pub struct SqliteCalendarStore { - db: SqlitePool, - sender: Sender, -} - -#[async_trait] -impl CalendarStore for SqliteCalendarStore { - #[instrument] - async fn get_calendar(&self, principal: &str, id: &str) -> Result { - let cal = sqlx::query_as!( - Calendar, - r#"SELECT principal, id, synctoken, "order", displayname, description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic - FROM calendars - WHERE (principal, id) = (?, ?)"#, - principal, - id - ) - .fetch_one(&self.db) - .await.map_err(crate::Error::from)?; - Ok(cal) - } - - #[instrument] - async fn get_calendars(&self, principal: &str) -> Result, Error> { - let cals = sqlx::query_as!( - Calendar, - r#"SELECT principal, id, synctoken, displayname, "order", description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic - FROM calendars - WHERE principal = ? AND deleted_at IS NULL"#, - principal - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)?; - Ok(cals) - } - - #[instrument] - async fn get_deleted_calendars(&self, principal: &str) -> Result, Error> { - let cals = sqlx::query_as!( - Calendar, - r#"SELECT principal, id, synctoken, displayname, "order", description, color, timezone, timezone_id, deleted_at, subscription_url, push_topic - FROM calendars - WHERE principal = ? AND deleted_at IS NOT NULL"#, - principal - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)?; - Ok(cals) - } - - #[instrument] - async fn insert_calendar(&self, calendar: Calendar) -> Result<(), Error> { - sqlx::query!( - r#"INSERT INTO calendars (principal, id, displayname, description, "order", color, timezone, timezone_id, push_topic) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"#, - calendar.principal, - calendar.id, - calendar.displayname, - calendar.description, - calendar.order, - calendar.color, - calendar.timezone, - calendar.timezone_id, - calendar.push_topic, - ) - .execute(&self.db) - .await.map_err(crate::Error::from)?; - Ok(()) - } - - #[instrument] - async fn update_calendar( - &self, - principal: String, - id: String, - calendar: Calendar, - ) -> Result<(), Error> { - let result = sqlx::query!( - r#"UPDATE calendars SET principal = ?, id = ?, displayname = ?, description = ?, "order" = ?, color = ?, timezone = ?, timezone_id = ?, push_topic = ? - WHERE (principal, id) = (?, ?)"#, - calendar.principal, - calendar.id, - calendar.displayname, - calendar.description, - calendar.order, - calendar.color, - calendar.timezone, - calendar.timezone_id, - calendar.push_topic, - principal, - id - ).execute(&self.db).await.map_err(crate::Error::from)?; - if result.rows_affected() == 0 { - return Err(rustical_store::Error::NotFound); - } - Ok(()) - } - - // Does not actually delete the calendar but just disables it - #[instrument] - async fn delete_calendar( - &self, - principal: &str, - id: &str, - use_trashbin: bool, - ) -> Result<(), Error> { - let cal = match self.get_calendar(principal, id).await { - Ok(cal) => Some(cal), - Err(Error::NotFound) => None, - Err(err) => return Err(err), - }; - - match use_trashbin { - true => { - sqlx::query!( - r#"UPDATE calendars SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#, - principal, id - ) - .execute(&self.db) - .await.map_err(crate::Error::from)?; - } - false => { - sqlx::query!( - r#"DELETE FROM calendars WHERE (principal, id) = (?, ?)"#, - principal, - id - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - } - }; - - if let Some(cal) = cal { - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Delete, - domain: rustical_store::CollectionOperationDomain::Calendar, - topic: cal.push_topic, - sync_token: None, - }); - } - Ok(()) - } - - #[instrument] - async fn restore_calendar(&self, principal: &str, id: &str) -> Result<(), Error> { - sqlx::query!( - r"UPDATE calendars SET deleted_at = NULL WHERE (principal, id) = (?, ?)", - principal, - id - ) - .execute(&self.db) - .await - .map_err(crate::Error::from)?; - Ok(()) - } - - #[instrument] - async fn get_objects( - &self, - principal: &str, - cal_id: &str, - ) -> Result, Error> { - sqlx::query_as!( - CalendarObjectRow, - "SELECT id, ics FROM calendarobjects WHERE principal = ? AND cal_id = ? AND deleted_at IS NULL", - principal, - cal_id - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)? - .into_iter() - .map(|row| row.try_into().map_err(rustical_store::Error::from)) - .collect() - } - - #[instrument] - async fn get_object( - &self, - principal: &str, - cal_id: &str, - object_id: &str, - ) -> Result { - Ok(sqlx::query_as!( - CalendarObjectRow, - "SELECT id, ics FROM calendarobjects WHERE (principal, cal_id, id) = (?, ?, ?)", - principal, - cal_id, - object_id - ) - .fetch_one(&self.db) - .await - .map_err(crate::Error::from)? - .try_into()?) - } - - #[instrument] - async fn put_object( - &self, - principal: String, - cal_id: String, - object: CalendarObject, - overwrite: bool, - ) -> Result<(), Error> { - // TODO: Prevent objects from being commited to a subscription calendar - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - let (object_id, ics) = (object.get_id(), object.get_ics()); - - let first_occurence = object - .get_first_occurence() - .ok() - .flatten() - .as_ref() - .map(CalDateTime::date); - let last_occurence = object - .get_last_occurence() - .ok() - .flatten() - .as_ref() - .map(CalDateTime::date); - let etag = object.get_etag(); - let object_type = object.get_object_type() as u8; - - (if overwrite { - sqlx::query!( - "REPLACE INTO calendarobjects (principal, cal_id, id, ics, first_occurence, last_occurence, etag, object_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - principal, - cal_id, - object_id, - ics, - first_occurence, - last_occurence, - etag, - object_type, - ) - } else { - // If the object already exists a database error is thrown and handled in error.rs - sqlx::query!( - "INSERT INTO calendarobjects (principal, cal_id, id, ics, first_occurence, last_occurence, etag, object_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - principal, - cal_id, - object_id, - ics, - first_occurence, - last_occurence, - etag, - object_type, - ) - }) - .execute(&mut *tx) - .await - .map_err(crate::Error::from)?; - - let synctoken = log_object_operation( - &mut tx, - &principal, - &cal_id, - object_id, - ChangeOperation::Add, - ) - .await?; - - tx.commit().await.map_err(crate::Error::from)?; - - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: rustical_store::CollectionOperationDomain::Calendar, - topic: self.get_calendar(&principal, &cal_id).await?.push_topic, - sync_token: Some(synctoken), - }); - Ok(()) - } - - #[instrument] - async fn delete_object( - &self, - principal: &str, - cal_id: &str, - id: &str, - use_trashbin: bool, - ) -> Result<(), Error> { - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - match use_trashbin { - true => { - sqlx::query!( - "UPDATE calendarobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)", - principal, - cal_id, - id - ) - .execute(&mut *tx) - .await.map_err(crate::Error::from)?; - } - false => { - sqlx::query!( - "DELETE FROM calendarobjects WHERE cal_id = ? AND id = ?", - cal_id, - id - ) - .execute(&mut *tx) - .await - .map_err(crate::Error::from)?; - } - }; - let synctoken = - log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?; - tx.commit().await.map_err(crate::Error::from)?; - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: rustical_store::CollectionOperationDomain::Calendar, - topic: self.get_calendar(principal, cal_id).await?.push_topic, - sync_token: Some(synctoken), - }); - Ok(()) - } - - #[instrument] - async fn restore_object( - &self, - principal: &str, - cal_id: &str, - object_id: &str, - ) -> Result<(), Error> { - let mut tx = self.db.begin().await.map_err(crate::Error::from)?; - - sqlx::query!( - r#"UPDATE calendarobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)"#, - principal, - cal_id, - object_id - ) - .execute(&mut *tx) - .await.map_err(crate::Error::from)?; - - let synctoken = - log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add) - .await?; - tx.commit().await.map_err(crate::Error::from)?; - // TODO: Watch for errors here? - let _ = self.sender.try_send(CollectionOperation { - r#type: CollectionOperationType::Object, - domain: rustical_store::CollectionOperationDomain::Calendar, - topic: self.get_calendar(principal, cal_id).await?.push_topic, - sync_token: Some(synctoken), - }); - Ok(()) - } - - #[instrument] - async fn sync_changes( - &self, - principal: &str, - cal_id: &str, - synctoken: i64, - ) -> Result<(Vec, Vec, i64), Error> { - struct Row { - object_id: String, - synctoken: i64, - } - let changes = sqlx::query_as!( - Row, - r#" - SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from calendarobjectchangelog - WHERE synctoken > ? - ORDER BY synctoken ASC - "#, - synctoken - ) - .fetch_all(&self.db) - .await.map_err(crate::Error::from)?; - - let mut objects = vec![]; - let mut deleted_objects = vec![]; - - let new_synctoken = changes - .last() - .map(|&Row { synctoken, .. }| synctoken) - .unwrap_or(0); - - for Row { object_id, .. } in changes { - match self.get_object(principal, cal_id, &object_id).await { - Ok(object) => objects.push(object), - Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id), - Err(err) => return Err(err), - } - } - - Ok((objects, deleted_objects, new_synctoken)) - } - - fn is_read_only(&self) -> bool { - false - } -}