From a686286d06e1f2e2b60dda2da4303bb43cb75581 Mon Sep 17 00:00:00 2001 From: Lennart <18233294+lennart-k@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:44:41 +0100 Subject: [PATCH] sqlite_store: Refactor notification logic --- ...669d3faed66a75fbfc7cd93e5f64b778f45ab.json | 12 ++ ...7ec11f81fd86d3b02fc1df07a036c6b47ace2.json | 12 -- .../store_sqlite/src/addressbook_store/mod.rs | 160 ++++++++--------- crates/store_sqlite/src/calendar_store.rs | 162 +++++++++--------- crates/store_sqlite/src/subscription_store.rs | 2 +- 5 files changed, 168 insertions(+), 180 deletions(-) create mode 100644 .sqlx/query-583069cbeba5285c63c2b95e989669d3faed66a75fbfc7cd93e5f64b778f45ab.json delete mode 100644 .sqlx/query-6d08d3a014743da9b445ab012437ec11f81fd86d3b02fc1df07a036c6b47ace2.json diff --git a/.sqlx/query-583069cbeba5285c63c2b95e989669d3faed66a75fbfc7cd93e5f64b778f45ab.json b/.sqlx/query-583069cbeba5285c63c2b95e989669d3faed66a75fbfc7cd93e5f64b778f45ab.json new file mode 100644 index 0000000..ae1947f --- /dev/null +++ b/.sqlx/query-583069cbeba5285c63c2b95e989669d3faed66a75fbfc7cd93e5f64b778f45ab.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "REPLACE INTO davpush_subscriptions (id, topic, expiration, push_resource, public_key, public_key_type, auth_secret) VALUES (?, ?, ?, ?, ?, ?, ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 7 + }, + "nullable": [] + }, + "hash": "583069cbeba5285c63c2b95e989669d3faed66a75fbfc7cd93e5f64b778f45ab" +} diff --git a/.sqlx/query-6d08d3a014743da9b445ab012437ec11f81fd86d3b02fc1df07a036c6b47ace2.json b/.sqlx/query-6d08d3a014743da9b445ab012437ec11f81fd86d3b02fc1df07a036c6b47ace2.json deleted file mode 100644 index b46345f..0000000 --- a/.sqlx/query-6d08d3a014743da9b445ab012437ec11f81fd86d3b02fc1df07a036c6b47ace2.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "INSERT OR REPLACE INTO davpush_subscriptions (id, topic, expiration, push_resource, public_key, public_key_type, auth_secret) VALUES (?, ?, ?, ?, ?, ?, ?)", - "describe": { - "columns": [], - "parameters": { - "Right": 7 - }, - "nullable": [] - }, - "hash": "6d08d3a014743da9b445ab012437ec11f81fd86d3b02fc1df07a036c6b47ace2" -} diff --git a/crates/store_sqlite/src/addressbook_store/mod.rs b/crates/store_sqlite/src/addressbook_store/mod.rs index 04e5730..ff44464 100644 --- a/crates/store_sqlite/src/addressbook_store/mod.rs +++ b/crates/store_sqlite/src/addressbook_store/mod.rs @@ -9,7 +9,7 @@ use rustical_store::{ }; use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction}; use tokio::sync::mpsc::Sender; -use tracing::{error, instrument, warn}; +use tracing::{error_span, instrument, warn}; pub mod birthday_calendar; @@ -74,7 +74,7 @@ impl SqliteAddressbookStore { "Commiting orphaned addressbook object ({},{},{}), deleted={}", &row.principal, &row.addressbook_id, &row.id, &row.deleted ); - log_object_operation( + Self::log_object_operation( &mut tx, &row.principal, &row.addressbook_id, @@ -88,6 +88,57 @@ impl SqliteAddressbookStore { Ok(()) } + // Logs an operation to an address object + async fn log_object_operation( + tx: &mut Transaction<'_, Sqlite>, + principal: &str, + addressbook_id: &str, + object_id: &str, + operation: ChangeOperation, + ) -> Result { + struct Synctoken { + synctoken: i64, + } + let Synctoken { synctoken } = sqlx::query_as!( + Synctoken, + r#" + UPDATE addressbooks + SET synctoken = synctoken + 1 + WHERE (principal, id) = (?1, ?2) + RETURNING synctoken"#, + principal, + addressbook_id + ) + .fetch_one(&mut **tx) + .await + .map_err(crate::Error::from)?; + + sqlx::query!( + r#" + INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, "operation", synctoken) + VALUES (?1, ?2, ?3, ?4, ( + SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2) + ))"#, + principal, + addressbook_id, + object_id, + operation + ) + .execute(&mut **tx) + .await + .map_err(crate::Error::from)?; + Ok(format_synctoken(synctoken)) + } + + fn send_push_notification(&self, data: CollectionOperationInfo, topic: String) { + if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) { + error_span!( + "Error trying to send addressbook update notification:", + err = format!("{err:?}"), + ); + } + } + async fn _get_addressbook<'e, E: Executor<'e, Database = Sqlite>>( executor: E, principal: &str, @@ -496,13 +547,8 @@ impl AddressbookStore for SqliteAddressbookStore { Self::_delete_addressbook(&mut *tx, principal, addressbook_id, use_trashbin).await?; tx.commit().await.map_err(crate::Error::from)?; - if let Some(addressbook) = addressbook - && let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Delete, - topic: addressbook.push_topic, - }) - { - error!("Push notification about deleted addressbook failed: {err}"); + if let Some(addressbook) = addressbook { + self.send_push_notification(CollectionOperationInfo::Delete, addressbook.push_topic); } Ok(()) @@ -588,7 +634,7 @@ impl AddressbookStore for SqliteAddressbookStore { Self::_put_object(&mut *tx, &principal, &addressbook_id, &object, overwrite).await?; - let sync_token = log_object_operation( + let sync_token = Self::log_object_operation( &mut tx, &principal, &addressbook_id, @@ -600,15 +646,12 @@ impl AddressbookStore for SqliteAddressbookStore { tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_addressbook(&principal, &addressbook_id, false) + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_addressbook(&principal, &addressbook_id, false) .await? .push_topic, - }) { - error!("Push notification about deleted addressbook failed: {err}"); - } + ); Ok(()) } @@ -629,7 +672,7 @@ impl AddressbookStore for SqliteAddressbookStore { Self::_delete_object(&mut *tx, principal, addressbook_id, object_id, use_trashbin).await?; - let sync_token = log_object_operation( + let sync_token = Self::log_object_operation( &mut tx, principal, addressbook_id, @@ -641,15 +684,12 @@ impl AddressbookStore for SqliteAddressbookStore { tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_addressbook(principal, addressbook_id, false) + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_addressbook(principal, addressbook_id, false) .await? .push_topic, - }) { - error!("Push notification about deleted addressbook failed: {err}"); - } + ); Ok(()) } @@ -668,7 +708,7 @@ impl AddressbookStore for SqliteAddressbookStore { Self::_restore_object(&mut *tx, principal, addressbook_id, object_id).await?; - let sync_token = log_object_operation( + let sync_token = Self::log_object_operation( &mut tx, principal, addressbook_id, @@ -679,15 +719,12 @@ impl AddressbookStore for SqliteAddressbookStore { .map_err(crate::Error::from)?; tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_addressbook(principal, addressbook_id, false) + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_addressbook(principal, addressbook_id, false) .await? .push_topic, - }) { - error!("Push notification about restored addressbook object failed: {err}"); - } + ); Ok(()) } @@ -732,7 +769,7 @@ impl AddressbookStore for SqliteAddressbookStore { .await?; sync_token = Some( - log_object_operation( + Self::log_object_operation( &mut tx, &addressbook.principal, &addressbook.id, @@ -744,59 +781,14 @@ impl AddressbookStore for SqliteAddressbookStore { } tx.commit().await.map_err(crate::Error::from)?; - if let Some(sync_token) = sync_token - && let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_addressbook(&addressbook.principal, &addressbook.id, true) + if let Some(sync_token) = sync_token { + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_addressbook(&addressbook.principal, &addressbook.id, true) .await? .push_topic, - }) - { - error!("Push notification about imported addressbook failed: {err}"); + ); } Ok(()) } } - -// Logs an operation to an address object -async fn log_object_operation( - tx: &mut Transaction<'_, Sqlite>, - principal: &str, - addressbook_id: &str, - object_id: &str, - operation: ChangeOperation, -) -> Result { - struct Synctoken { - synctoken: i64, - } - let Synctoken { synctoken } = sqlx::query_as!( - Synctoken, - r#" - UPDATE addressbooks - SET synctoken = synctoken + 1 - WHERE (principal, id) = (?1, ?2) - RETURNING synctoken"#, - principal, - addressbook_id - ) - .fetch_one(&mut **tx) - .await - .map_err(crate::Error::from)?; - - sqlx::query!( - r#" - INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, "operation", synctoken) - VALUES (?1, ?2, ?3, ?4, ( - SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2) - ))"#, - principal, - addressbook_id, - object_id, - operation - ) - .execute(&mut **tx) - .await - .map_err(crate::Error::from)?; - Ok(format_synctoken(synctoken)) -} diff --git a/crates/store_sqlite/src/calendar_store.rs b/crates/store_sqlite/src/calendar_store.rs index 1787672..780ce3a 100644 --- a/crates/store_sqlite/src/calendar_store.rs +++ b/crates/store_sqlite/src/calendar_store.rs @@ -11,7 +11,7 @@ use rustical_store::{CollectionOperation, CollectionOperationInfo}; use sqlx::types::chrono::NaiveDateTime; use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction}; use tokio::sync::mpsc::Sender; -use tracing::{error, instrument, warn}; +use tracing::{error_span, instrument, warn}; #[derive(Debug, Clone)] struct CalendarObjectRow { @@ -94,6 +94,57 @@ pub struct SqliteCalendarStore { } impl SqliteCalendarStore { + // Logs an operation to the events + async fn log_object_operation( + tx: &mut Transaction<'_, Sqlite>, + principal: &str, + cal_id: &str, + object_id: &str, + operation: ChangeOperation, + ) -> Result { + struct Synctoken { + synctoken: i64, + } + let Synctoken { synctoken } = sqlx::query_as!( + Synctoken, + r#" + UPDATE calendars + SET synctoken = synctoken + 1 + WHERE (principal, id) = (?1, ?2) + RETURNING synctoken"#, + principal, + cal_id + ) + .fetch_one(&mut **tx) + .await + .map_err(crate::Error::from)?; + + sqlx::query!( + r#" + INSERT INTO calendarobjectchangelog (principal, cal_id, object_id, "operation", synctoken) + VALUES (?1, ?2, ?3, ?4, ( + SELECT synctoken FROM calendars WHERE (principal, id) = (?1, ?2) + ))"#, + principal, + cal_id, + object_id, + operation + ) + .execute(&mut **tx) + .await + .map_err(crate::Error::from)?; + Ok(format_synctoken(synctoken)) + } + + fn send_push_notification(&self, data: CollectionOperationInfo, topic: String) { + if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) { + error_span!( + "Error trying to send calendar update notification:", + err = format!("{err:?}"), + ); + } + } + // Commit "orphaned" objects to the changelog table pub async fn repair_orphans(&self) -> Result<(), Error> { struct Row { @@ -134,7 +185,8 @@ impl SqliteCalendarStore { "Commiting orphaned calendar object ({},{},{}), deleted={}", &row.principal, &row.cal_id, &row.id, &row.deleted ); - log_object_operation(&mut tx, &row.principal, &row.cal_id, &row.id, operation).await?; + Self::log_object_operation(&mut tx, &row.principal, &row.cal_id, &row.id, operation) + .await?; } tx.commit().await.map_err(crate::Error::from)?; @@ -605,13 +657,8 @@ impl CalendarStore for SqliteCalendarStore { Self::_delete_calendar(&mut *tx, principal, id, use_trashbin).await?; tx.commit().await.map_err(crate::Error::from)?; - if let Some(cal) = cal - && let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Delete, - topic: cal.push_topic, - }) - { - error!("Push notification about deleted calendar failed: {err}"); + if let Some(cal) = cal { + self.send_push_notification(CollectionOperationInfo::Delete, cal.push_topic); } Ok(()) } @@ -652,7 +699,7 @@ impl CalendarStore for SqliteCalendarStore { Self::_put_object(&mut *tx, &calendar.principal, &calendar.id, &object, false).await?; sync_token = Some( - log_object_operation( + Self::log_object_operation( &mut tx, &calendar.principal, &calendar.id, @@ -665,16 +712,13 @@ impl CalendarStore for SqliteCalendarStore { tx.commit().await.map_err(crate::Error::from)?; - if let Some(sync_token) = sync_token - && let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_calendar(&calendar.principal, &calendar.id, true) + if let Some(sync_token) = sync_token { + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_calendar(&calendar.principal, &calendar.id, true) .await? .push_topic, - }) - { - error!("Push notification about imported calendar failed: {err}"); + ); } Ok(()) } @@ -755,7 +799,7 @@ impl CalendarStore for SqliteCalendarStore { Self::_put_object(&mut *tx, &principal, &cal_id, &object, overwrite).await?; - let sync_token = log_object_operation( + let sync_token = Self::log_object_operation( &mut tx, &principal, &cal_id, @@ -766,15 +810,12 @@ impl CalendarStore for SqliteCalendarStore { tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self - .get_calendar(&principal, &cal_id, true) + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_calendar(&principal, &cal_id, true) .await? .push_topic, - }) { - error!("Push notification about deleted calendar failed: {err}"); - } + ); Ok(()) } @@ -795,15 +836,15 @@ impl CalendarStore for SqliteCalendarStore { Self::_delete_object(&mut *tx, principal, cal_id, id, use_trashbin).await?; let sync_token = - log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?; + Self::log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete) + .await?; tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self.get_calendar(principal, cal_id, true).await?.push_topic, - }) { - error!("Push notification about deleted calendar failed: {err}"); - } + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_calendar(principal, cal_id, true).await?.push_topic, + ); + Ok(()) } @@ -823,16 +864,14 @@ impl CalendarStore for SqliteCalendarStore { Self::_restore_object(&mut *tx, principal, cal_id, object_id).await?; let sync_token = - log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add) + Self::log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add) .await?; tx.commit().await.map_err(crate::Error::from)?; - if let Err(err) = self.sender.try_send(CollectionOperation { - data: CollectionOperationInfo::Content { sync_token }, - topic: self.get_calendar(principal, cal_id, true).await?.push_topic, - }) { - error!("Push notification about restored calendar object failed: {err}"); - } + self.send_push_notification( + CollectionOperationInfo::Content { sync_token }, + self.get_calendar(principal, cal_id, true).await?.push_topic, + ); Ok(()) } @@ -850,46 +889,3 @@ impl CalendarStore for SqliteCalendarStore { false } } - -// Logs an operation to the events -// TODO: Log multiple updates -async fn log_object_operation( - tx: &mut Transaction<'_, Sqlite>, - principal: &str, - cal_id: &str, - object_id: &str, - operation: ChangeOperation, -) -> Result { - struct Synctoken { - synctoken: i64, - } - let Synctoken { synctoken } = sqlx::query_as!( - Synctoken, - r#" - UPDATE calendars - SET synctoken = synctoken + 1 - WHERE (principal, id) = (?1, ?2) - RETURNING synctoken"#, - principal, - cal_id - ) - .fetch_one(&mut **tx) - .await - .map_err(crate::Error::from)?; - - sqlx::query!( - r#" - INSERT INTO calendarobjectchangelog (principal, cal_id, object_id, "operation", synctoken) - VALUES (?1, ?2, ?3, ?4, ( - SELECT synctoken FROM calendars WHERE (principal, id) = (?1, ?2) - ))"#, - principal, - cal_id, - object_id, - operation - ) - .execute(&mut **tx) - .await - .map_err(crate::Error::from)?; - Ok(format_synctoken(synctoken)) -} diff --git a/crates/store_sqlite/src/subscription_store.rs b/crates/store_sqlite/src/subscription_store.rs index 79d5ac3..17e92d4 100644 --- a/crates/store_sqlite/src/subscription_store.rs +++ b/crates/store_sqlite/src/subscription_store.rs @@ -37,7 +37,7 @@ impl SubscriptionStore for SqliteStore { Err(err) => return Err(err), }; sqlx::query!( - r#"INSERT OR REPLACE INTO davpush_subscriptions (id, topic, expiration, push_resource, public_key, public_key_type, auth_secret) VALUES (?, ?, ?, ?, ?, ?, ?)"#, + r#"REPLACE INTO davpush_subscriptions (id, topic, expiration, push_resource, public_key, public_key_type, auth_secret) VALUES (?, ?, ?, ?, ?, ?, ?)"#, sub.id, sub.topic, sub.expiration,