use super::ChangeOperation; use crate::BEGIN_IMMEDIATE; use async_trait::async_trait; use derive_more::derive::Constructor; use ical::parser::ParserError; use rustical_ical::AddressObject; use rustical_store::{ Addressbook, AddressbookStore, CollectionMetadata, CollectionOperation, CollectionOperationInfo, Error, synctoken::format_synctoken, }; use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction}; use tokio::sync::mpsc::Sender; use tracing::{error, error_span, instrument, warn}; pub mod birthday_calendar; #[derive(Debug, Clone)] struct AddressObjectRow { id: String, vcf: String, } impl From for (String, Result) { fn from(row: AddressObjectRow) -> Self { let result = AddressObject::from_vcf(row.vcf); (row.id, result) } } impl TryFrom for (String, AddressObject) { type Error = rustical_store::Error; fn try_from(value: AddressObjectRow) -> Result { Ok((value.id, AddressObject::from_vcf(value.vcf)?)) } } #[derive(Debug, Clone, Constructor)] pub struct SqliteAddressbookStore { db: SqlitePool, sender: Sender, skip_broken: bool, } impl SqliteAddressbookStore { // Commit "orphaned" objects to the changelog table pub async fn repair_orphans(&self) -> Result<(), Error> { struct Row { principal: String, addressbook_id: String, id: String, deleted: bool, } let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; let rows = sqlx::query_as!( Row, r#" SELECT principal, addressbook_id, id, (deleted_at IS NOT NULL) AS "deleted: bool" FROM addressobjects WHERE (principal, addressbook_id, id) NOT IN ( SELECT DISTINCT principal, addressbook_id, object_id FROM addressobjectchangelog ) ; "#, ) .fetch_all(&mut *tx) .await .map_err(crate::Error::from)?; for row in rows { let operation = if row.deleted { ChangeOperation::Delete } else { ChangeOperation::Add }; warn!( "Commiting orphaned addressbook object ({},{},{}), deleted={}", &row.principal, &row.addressbook_id, &row.id, &row.deleted ); Self::log_object_operation( &mut tx, &row.principal, &row.addressbook_id, &row.id, operation, ) .await?; } tx.commit().await.map_err(crate::Error::from)?; Ok(()) } #[allow(clippy::missing_panics_doc)] pub async fn validate_objects(&self, principal: &str) -> Result<(), Error> { let mut success = true; for addressbook in self.get_addressbooks(principal).await? { for (object_id, res) in Self::_get_objects(&self.db, principal, &addressbook.id).await? { if let Err(err) = res { warn!( "Invalid address object found at {principal}/{addr_id}/{object_id}.vcf. Error: {err}", addr_id = addressbook.id ); success = false; } } } if !success { if self.skip_broken { error!( "Not all address objects are valid. Since data_store.sqlite.skip_broken=true they will be hidden. You are still advised to manually remove or repair the object. If you need help feel free to open up an issue on GitHub." ); } else { error!( "Not all address objects are valid. Since data_store.sqlite.skip_broken=false this causes a panic. Remove or repair the broken objects manually or set data_store.sqlite.skip_broken=false as a temporary solution to ignore the error. If you need help feel free to open up an issue on GitHub." ); panic!(); } } Ok(()) } // Logs an operation to an address object async fn log_object_operation( tx: &mut Transaction<'_, Sqlite>, principal: &str, addressbook_id: &str, object_id: &str, operation: ChangeOperation, ) -> Result { struct Synctoken { synctoken: i64, } let Synctoken { synctoken } = sqlx::query_as!( Synctoken, r#" UPDATE addressbooks SET synctoken = synctoken + 1 WHERE (principal, id) = (?1, ?2) RETURNING synctoken"#, principal, addressbook_id ) .fetch_one(&mut **tx) .await .map_err(crate::Error::from)?; sqlx::query!( r#" INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, "operation", synctoken) VALUES (?1, ?2, ?3, ?4, ( SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2) ))"#, principal, addressbook_id, object_id, operation ) .execute(&mut **tx) .await .map_err(crate::Error::from)?; Ok(format_synctoken(synctoken)) } fn send_push_notification(&self, data: CollectionOperationInfo, topic: String) { if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) { error_span!( "Error trying to send addressbook update notification:", err = format!("{err}"), ); } } async fn _get_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, id: &str, show_deleted: bool, ) -> Result { let addressbook = sqlx::query_as!( Addressbook, r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic FROM addressbooks WHERE (principal, id) = (?, ?) AND ((deleted_at IS NULL) OR ?) "#, principal, id, show_deleted ) .fetch_one(executor) .await .map_err(crate::Error::from)?; Ok(addressbook) } async fn _get_addressbooks<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, ) -> Result, rustical_store::Error> { let addressbooks = sqlx::query_as!( Addressbook, r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic FROM addressbooks WHERE principal = ? AND deleted_at IS NULL"#, principal ) .fetch_all(executor) .await .map_err(crate::Error::from)?; Ok(addressbooks) } async fn _get_deleted_addressbooks<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, ) -> Result, rustical_store::Error> { let addressbooks = sqlx::query_as!( Addressbook, r#"SELECT principal, id, synctoken, displayname, description, deleted_at, push_topic FROM addressbooks WHERE principal = ? AND deleted_at IS NOT NULL"#, principal ) .fetch_all(executor) .await .map_err(crate::Error::from)?; Ok(addressbooks) } async fn _update_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, id: &str, addressbook: &Addressbook, ) -> Result<(), rustical_store::Error> { let result = sqlx::query!( r#"UPDATE addressbooks SET principal = ?, id = ?, displayname = ?, description = ?, push_topic = ? WHERE (principal, id) = (?, ?)"#, addressbook.principal, addressbook.id, addressbook.displayname, addressbook.description, addressbook.push_topic, principal, id ) .execute(executor) .await .map_err(crate::Error::from)?; if result.rows_affected() == 0 { return Err(rustical_store::Error::NotFound); } Ok(()) } async fn _insert_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, addressbook: &Addressbook, ) -> Result<(), rustical_store::Error> { sqlx::query!( r#"INSERT INTO addressbooks (principal, id, displayname, description, push_topic) VALUES (?, ?, ?, ?, ?)"#, addressbook.principal, addressbook.id, addressbook.displayname, addressbook.description, addressbook.push_topic, ) .execute(executor) .await .map_err(crate::Error::from)?; Ok(()) } async fn _delete_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, use_trashbin: bool, ) -> Result<(), rustical_store::Error> { if use_trashbin { sqlx::query!( r#"UPDATE addressbooks SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#, principal, addressbook_id ) .execute(executor) .await .map_err(crate::Error::from)?; } else { sqlx::query!( r#"DELETE FROM addressbooks WHERE (principal, id) = (?, ?)"#, principal, addressbook_id ) .execute(executor) .await .map_err(crate::Error::from)?; } Ok(()) } async fn _restore_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, ) -> Result<(), rustical_store::Error> { sqlx::query!( r"UPDATE addressbooks SET deleted_at = NULL WHERE (principal, id) = (?, ?)", principal, addressbook_id ) .execute(executor) .await .map_err(crate::Error::from)?; Ok(()) } async fn _sync_changes<'a, A: Acquire<'a, Database = Sqlite>>( acquire: A, principal: &str, addressbook_id: &str, synctoken: i64, ) -> Result<(Vec<(String, AddressObject)>, Vec, i64), rustical_store::Error> { struct Row { object_id: String, synctoken: i64, } let mut conn = acquire.acquire().await.map_err(crate::Error::from)?; let changes = sqlx::query_as!( Row, r#" SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from addressobjectchangelog WHERE synctoken > ? ORDER BY synctoken ASC "#, synctoken ) .fetch_all(&mut *conn) .await.map_err(crate::Error::from)?; let mut objects = vec![]; let mut deleted_objects = vec![]; let new_synctoken = changes.last().map_or(0, |&Row { synctoken, .. }| synctoken); for Row { object_id, .. } in changes { match Self::_get_object(&mut *conn, principal, addressbook_id, &object_id, false).await { Ok(object) => objects.push((object_id, object)), Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id), Err(err) => return Err(err), } } Ok((objects, deleted_objects, new_synctoken)) } async fn _list_objects<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, ) -> Result, rustical_store::Error> { struct ObjectEntry { length: u64, deleted: bool, } Ok(sqlx::query_as!( ObjectEntry, "SELECT length(vcf) AS 'length!: u64', deleted_at AS 'deleted!: bool' FROM addressobjects WHERE principal = ? AND addressbook_id = ?", principal, addressbook_id ) .fetch_all(executor) .await.map_err(crate::Error::from)? .into_iter() .map(|row| (row.length, row.deleted)) .collect()) } async fn _get_objects<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, ) -> Result)>, Error> { Ok(sqlx::query_as!( AddressObjectRow, "SELECT id, vcf FROM addressobjects WHERE principal = ? AND addressbook_id = ? AND deleted_at IS NULL", principal, addressbook_id ) .fetch_all(executor) .await.map_err(crate::Error::from)? .into_iter() .map(Into::into) ) } async fn _get_object<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, object_id: &str, show_deleted: bool, ) -> Result { let (id, object) = sqlx::query_as!( AddressObjectRow, "SELECT id, vcf FROM addressobjects WHERE (principal, addressbook_id, id) = (?, ?, ?) AND ((deleted_at IS NULL) OR ?)", principal, addressbook_id, object_id, show_deleted ) .fetch_one(executor) .await .map_err(crate::Error::from)? .try_into()?; assert_eq!(id, object_id); Ok(object) } async fn _put_object<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, object_id: &str, object: &AddressObject, overwrite: bool, ) -> Result<(), rustical_store::Error> { let vcf = object.get_vcf(); (if overwrite { sqlx::query!( "REPLACE INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", principal, addressbook_id, object_id, vcf ) } else { // If the object already exists a database error is thrown and handled in error.rs sqlx::query!( "INSERT INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)", principal, addressbook_id, object_id, vcf ) }) .execute(executor) .await .map_err(crate::Error::from)?; Ok(()) } async fn _delete_object<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, object_id: &str, use_trashbin: bool, ) -> Result<(), rustical_store::Error> { if use_trashbin { sqlx::query!( "UPDATE addressobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)", principal, addressbook_id, object_id ) .execute(executor) .await.map_err(crate::Error::from)?; } else { sqlx::query!( "DELETE FROM addressobjects WHERE addressbook_id = ? AND id = ?", addressbook_id, object_id ) .execute(executor) .await .map_err(crate::Error::from)?; } Ok(()) } async fn _restore_object<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, addressbook_id: &str, object_id: &str, ) -> Result<(), rustical_store::Error> { sqlx::query!( r#"UPDATE addressobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)"#, principal, addressbook_id, object_id ) .execute(executor) .await.map_err(crate::Error::from)?; Ok(()) } } #[async_trait] impl AddressbookStore for SqliteAddressbookStore { #[instrument] async fn get_addressbook( &self, principal: &str, id: &str, show_deleted: bool, ) -> Result { Self::_get_addressbook(&self.db, principal, id, show_deleted).await } #[instrument] async fn get_addressbooks( &self, principal: &str, ) -> Result, rustical_store::Error> { Self::_get_addressbooks(&self.db, principal).await } #[instrument] async fn get_deleted_addressbooks( &self, principal: &str, ) -> Result, rustical_store::Error> { Self::_get_deleted_addressbooks(&self.db, principal).await } #[instrument] async fn update_addressbook( &self, principal: &str, id: &str, addressbook: Addressbook, ) -> Result<(), rustical_store::Error> { assert_eq!(principal, &addressbook.principal); assert_eq!(id, &addressbook.id); Self::_update_addressbook(&self.db, principal, id, &addressbook).await } #[instrument] async fn insert_addressbook( &self, addressbook: Addressbook, ) -> Result<(), rustical_store::Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; Self::_insert_addressbook(&mut *tx, &addressbook).await?; let birthday_cal = Self::default_birthday_calendar(addressbook); Self::_insert_birthday_calendar(&mut *tx, &birthday_cal).await?; tx.commit().await.map_err(crate::Error::from)?; Ok(()) } #[instrument] async fn delete_addressbook( &self, principal: &str, addressbook_id: &str, use_trashbin: bool, ) -> Result<(), rustical_store::Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; let addressbook = match Self::_get_addressbook(&mut *tx, principal, addressbook_id, use_trashbin).await { Ok(addressbook) => Some(addressbook), Err(Error::NotFound) => None, Err(err) => return Err(err), }; Self::_delete_addressbook(&mut *tx, principal, addressbook_id, use_trashbin).await?; tx.commit().await.map_err(crate::Error::from)?; if let Some(addressbook) = addressbook { self.send_push_notification(CollectionOperationInfo::Delete, addressbook.push_topic); } Ok(()) } #[instrument] async fn restore_addressbook( &self, principal: &str, addressbook_id: &str, ) -> Result<(), rustical_store::Error> { Self::_restore_addressbook(&self.db, principal, addressbook_id).await } #[instrument] async fn sync_changes( &self, principal: &str, addressbook_id: &str, synctoken: i64, ) -> Result<(Vec<(String, AddressObject)>, Vec, i64), rustical_store::Error> { Self::_sync_changes(&self.db, principal, addressbook_id, synctoken).await } #[instrument] async fn addressbook_metadata( &self, principal: &str, addressbook_id: &str, ) -> Result { let mut sizes = vec![]; let mut deleted_sizes = vec![]; for (size, deleted) in Self::_list_objects(&self.db, principal, addressbook_id).await? { if deleted { deleted_sizes.push(size); } else { sizes.push(size); } } Ok(CollectionMetadata { len: sizes.len(), deleted_len: deleted_sizes.len(), size: sizes.iter().sum(), deleted_size: deleted_sizes.iter().sum(), }) } #[instrument] async fn get_objects( &self, principal: &str, addressbook_id: &str, ) -> Result, rustical_store::Error> { let objects = Self::_get_objects(&self.db, principal, addressbook_id).await?; if self.skip_broken { Ok(objects .filter_map(|(id, res)| Some((id, res.ok()?))) .collect()) } else { Ok(objects .map(|(id, res)| res.map(|obj| (id, obj))) .collect::, _>>()?) } } #[instrument] async fn get_object( &self, principal: &str, addressbook_id: &str, object_id: &str, show_deleted: bool, ) -> Result { Self::_get_object(&self.db, principal, addressbook_id, object_id, show_deleted).await } #[instrument] async fn put_object( &self, principal: &str, addressbook_id: &str, object_id: &str, object: AddressObject, overwrite: bool, ) -> Result<(), rustical_store::Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; Self::_put_object( &mut *tx, principal, addressbook_id, object_id, &object, overwrite, ) .await?; let sync_token = Self::log_object_operation( &mut tx, principal, addressbook_id, object_id, ChangeOperation::Add, ) .await .map_err(crate::Error::from)?; tx.commit().await.map_err(crate::Error::from)?; self.send_push_notification( CollectionOperationInfo::Content { sync_token }, self.get_addressbook(principal, addressbook_id, false) .await? .push_topic, ); Ok(()) } #[instrument] async fn delete_object( &self, principal: &str, addressbook_id: &str, object_id: &str, use_trashbin: bool, ) -> Result<(), rustical_store::Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; Self::_delete_object(&mut *tx, principal, addressbook_id, object_id, use_trashbin).await?; let sync_token = Self::log_object_operation( &mut tx, principal, addressbook_id, object_id, ChangeOperation::Delete, ) .await .map_err(crate::Error::from)?; tx.commit().await.map_err(crate::Error::from)?; self.send_push_notification( CollectionOperationInfo::Content { sync_token }, self.get_addressbook(principal, addressbook_id, false) .await? .push_topic, ); Ok(()) } #[instrument] async fn restore_object( &self, principal: &str, addressbook_id: &str, object_id: &str, ) -> Result<(), rustical_store::Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; Self::_restore_object(&mut *tx, principal, addressbook_id, object_id).await?; let sync_token = Self::log_object_operation( &mut tx, principal, addressbook_id, object_id, ChangeOperation::Add, ) .await .map_err(crate::Error::from)?; tx.commit().await.map_err(crate::Error::from)?; self.send_push_notification( CollectionOperationInfo::Content { sync_token }, self.get_addressbook(principal, addressbook_id, false) .await? .push_topic, ); Ok(()) } #[instrument(skip(objects))] async fn import_addressbook( &self, addressbook: Addressbook, objects: Vec<(String, AddressObject)>, merge_existing: bool, ) -> Result<(), Error> { let mut tx = self .db .begin_with(BEGIN_IMMEDIATE) .await .map_err(crate::Error::from)?; let existing = match Self::_get_addressbook(&mut *tx, &addressbook.principal, &addressbook.id, true) .await { Ok(addressbook) => Some(addressbook), Err(Error::NotFound) => None, Err(err) => return Err(err), }; if existing.is_some() && !merge_existing { return Err(Error::AlreadyExists); } if existing.is_none() { Self::_insert_addressbook(&mut *tx, &addressbook).await?; } let mut sync_token = None; for (object_id, object) in objects { Self::_put_object( &mut *tx, &addressbook.principal, &addressbook.id, &object_id, &object, false, ) .await?; sync_token = Some( Self::log_object_operation( &mut tx, &addressbook.principal, &addressbook.id, &object_id, ChangeOperation::Add, ) .await?, ); } tx.commit().await.map_err(crate::Error::from)?; if let Some(sync_token) = sync_token { self.send_push_notification( CollectionOperationInfo::Content { sync_token }, self.get_addressbook(&addressbook.principal, &addressbook.id, true) .await? .push_topic, ); } Ok(()) } }