sqlite_store: Refactor notification logic

This commit is contained in:
Lennart
2025-12-10 10:44:41 +01:00
parent d81074de3b
commit a686286d06
5 changed files with 168 additions and 180 deletions

View File

@@ -9,7 +9,7 @@ use rustical_store::{
};
use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction};
use tokio::sync::mpsc::Sender;
use tracing::{error, instrument, warn};
use tracing::{error_span, instrument, warn};
pub mod birthday_calendar;
@@ -74,7 +74,7 @@ impl SqliteAddressbookStore {
"Commiting orphaned addressbook object ({},{},{}), deleted={}",
&row.principal, &row.addressbook_id, &row.id, &row.deleted
);
log_object_operation(
Self::log_object_operation(
&mut tx,
&row.principal,
&row.addressbook_id,
@@ -88,6 +88,57 @@ impl SqliteAddressbookStore {
Ok(())
}
// Logs an operation to an address object
async fn log_object_operation(
tx: &mut Transaction<'_, Sqlite>,
principal: &str,
addressbook_id: &str,
object_id: &str,
operation: ChangeOperation,
) -> Result<String, Error> {
struct Synctoken {
synctoken: i64,
}
let Synctoken { synctoken } = sqlx::query_as!(
Synctoken,
r#"
UPDATE addressbooks
SET synctoken = synctoken + 1
WHERE (principal, id) = (?1, ?2)
RETURNING synctoken"#,
principal,
addressbook_id
)
.fetch_one(&mut **tx)
.await
.map_err(crate::Error::from)?;
sqlx::query!(
r#"
INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, "operation", synctoken)
VALUES (?1, ?2, ?3, ?4, (
SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2)
))"#,
principal,
addressbook_id,
object_id,
operation
)
.execute(&mut **tx)
.await
.map_err(crate::Error::from)?;
Ok(format_synctoken(synctoken))
}
fn send_push_notification(&self, data: CollectionOperationInfo, topic: String) {
if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) {
error_span!(
"Error trying to send addressbook update notification:",
err = format!("{err:?}"),
);
}
}
async fn _get_addressbook<'e, E: Executor<'e, Database = Sqlite>>(
executor: E,
principal: &str,
@@ -496,13 +547,8 @@ impl AddressbookStore for SqliteAddressbookStore {
Self::_delete_addressbook(&mut *tx, principal, addressbook_id, use_trashbin).await?;
tx.commit().await.map_err(crate::Error::from)?;
if let Some(addressbook) = addressbook
&& let Err(err) = self.sender.try_send(CollectionOperation {
data: CollectionOperationInfo::Delete,
topic: addressbook.push_topic,
})
{
error!("Push notification about deleted addressbook failed: {err}");
if let Some(addressbook) = addressbook {
self.send_push_notification(CollectionOperationInfo::Delete, addressbook.push_topic);
}
Ok(())
@@ -588,7 +634,7 @@ impl AddressbookStore for SqliteAddressbookStore {
Self::_put_object(&mut *tx, &principal, &addressbook_id, &object, overwrite).await?;
let sync_token = log_object_operation(
let sync_token = Self::log_object_operation(
&mut tx,
&principal,
&addressbook_id,
@@ -600,15 +646,12 @@ impl AddressbookStore for SqliteAddressbookStore {
tx.commit().await.map_err(crate::Error::from)?;
if let Err(err) = self.sender.try_send(CollectionOperation {
data: CollectionOperationInfo::Content { sync_token },
topic: self
.get_addressbook(&principal, &addressbook_id, false)
self.send_push_notification(
CollectionOperationInfo::Content { sync_token },
self.get_addressbook(&principal, &addressbook_id, false)
.await?
.push_topic,
}) {
error!("Push notification about deleted addressbook failed: {err}");
}
);
Ok(())
}
@@ -629,7 +672,7 @@ impl AddressbookStore for SqliteAddressbookStore {
Self::_delete_object(&mut *tx, principal, addressbook_id, object_id, use_trashbin).await?;
let sync_token = log_object_operation(
let sync_token = Self::log_object_operation(
&mut tx,
principal,
addressbook_id,
@@ -641,15 +684,12 @@ impl AddressbookStore for SqliteAddressbookStore {
tx.commit().await.map_err(crate::Error::from)?;
if let Err(err) = self.sender.try_send(CollectionOperation {
data: CollectionOperationInfo::Content { sync_token },
topic: self
.get_addressbook(principal, addressbook_id, false)
self.send_push_notification(
CollectionOperationInfo::Content { sync_token },
self.get_addressbook(principal, addressbook_id, false)
.await?
.push_topic,
}) {
error!("Push notification about deleted addressbook failed: {err}");
}
);
Ok(())
}
@@ -668,7 +708,7 @@ impl AddressbookStore for SqliteAddressbookStore {
Self::_restore_object(&mut *tx, principal, addressbook_id, object_id).await?;
let sync_token = log_object_operation(
let sync_token = Self::log_object_operation(
&mut tx,
principal,
addressbook_id,
@@ -679,15 +719,12 @@ impl AddressbookStore for SqliteAddressbookStore {
.map_err(crate::Error::from)?;
tx.commit().await.map_err(crate::Error::from)?;
if let Err(err) = self.sender.try_send(CollectionOperation {
data: CollectionOperationInfo::Content { sync_token },
topic: self
.get_addressbook(principal, addressbook_id, false)
self.send_push_notification(
CollectionOperationInfo::Content { sync_token },
self.get_addressbook(principal, addressbook_id, false)
.await?
.push_topic,
}) {
error!("Push notification about restored addressbook object failed: {err}");
}
);
Ok(())
}
@@ -732,7 +769,7 @@ impl AddressbookStore for SqliteAddressbookStore {
.await?;
sync_token = Some(
log_object_operation(
Self::log_object_operation(
&mut tx,
&addressbook.principal,
&addressbook.id,
@@ -744,59 +781,14 @@ impl AddressbookStore for SqliteAddressbookStore {
}
tx.commit().await.map_err(crate::Error::from)?;
if let Some(sync_token) = sync_token
&& let Err(err) = self.sender.try_send(CollectionOperation {
data: CollectionOperationInfo::Content { sync_token },
topic: self
.get_addressbook(&addressbook.principal, &addressbook.id, true)
if let Some(sync_token) = sync_token {
self.send_push_notification(
CollectionOperationInfo::Content { sync_token },
self.get_addressbook(&addressbook.principal, &addressbook.id, true)
.await?
.push_topic,
})
{
error!("Push notification about imported addressbook failed: {err}");
);
}
Ok(())
}
}
// Logs an operation to an address object
async fn log_object_operation(
tx: &mut Transaction<'_, Sqlite>,
principal: &str,
addressbook_id: &str,
object_id: &str,
operation: ChangeOperation,
) -> Result<String, Error> {
struct Synctoken {
synctoken: i64,
}
let Synctoken { synctoken } = sqlx::query_as!(
Synctoken,
r#"
UPDATE addressbooks
SET synctoken = synctoken + 1
WHERE (principal, id) = (?1, ?2)
RETURNING synctoken"#,
principal,
addressbook_id
)
.fetch_one(&mut **tx)
.await
.map_err(crate::Error::from)?;
sqlx::query!(
r#"
INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, "operation", synctoken)
VALUES (?1, ?2, ?3, ?4, (
SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2)
))"#,
principal,
addressbook_id,
object_id,
operation
)
.execute(&mut **tx)
.await
.map_err(crate::Error::from)?;
Ok(format_synctoken(synctoken))
}