From 658e6ebc1bfb166e7668535cd995469ff1cff048 Mon Sep 17 00:00:00 2001 From: Lennart <18233294+lennart-k@users.noreply.github.com> Date: Wed, 15 Jan 2025 17:40:23 +0100 Subject: [PATCH] Move DAV Push logic to dav crate --- Cargo.lock | 3 +- Cargo.toml | 13 ++--- crates/dav/Cargo.toml | 2 + crates/dav/src/push/mod.rs | 2 + crates/dav/src/push/push_notifier.rs | 68 ++++++++++++++++++++++++ src/main.rs | 78 ++-------------------------- 6 files changed, 85 insertions(+), 81 deletions(-) create mode 100644 crates/dav/src/push/push_notifier.rs diff --git a/Cargo.lock b/Cargo.lock index 0209598..ead9a69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2748,7 +2748,6 @@ dependencies = [ "rustical_frontend", "rustical_store", "rustical_store_sqlite", - "rustical_xml", "serde", "sqlx", "tokio", @@ -2823,11 +2822,13 @@ dependencies = [ "itertools 0.14.0", "log", "quick-xml", + "reqwest", "rustical_store", "rustical_xml", "serde", "strum", "thiserror 2.0.11", + "tokio", "tracing", "tracing-actix-web", ] diff --git a/Cargo.toml b/Cargo.toml index b0112e0..55053ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,12 @@ quote = "1.0" proc-macro2 = "1.0" heck = "0.5" darling = "0.20" +reqwest = { version = "0.12", features = [ + "rustls-tls", + "charset", + "http2", +], default-features = false } + [dependencies] rustical_store = { workspace = true } rustical_store_sqlite = { workspace = true } @@ -134,11 +140,6 @@ rpassword.workspace = true argon2.workspace = true pbkdf2.workspace = true password-hash.workspace = true -reqwest = { version = "0.12", features = [ - "rustls-tls", - "charset", - "http2", -], default-features = false } -rustical_xml.workspace = true +reqwest.workspace = true rustical_dav.workspace = true quick-xml.workspace = true diff --git a/crates/dav/Cargo.toml b/crates/dav/Cargo.toml index 5e29b61..17dc258 100644 --- a/crates/dav/Cargo.toml +++ b/crates/dav/Cargo.toml @@ -21,3 +21,5 @@ log = { workspace = true } derive_more = { workspace = true } tracing = { workspace = true } tracing-actix-web = { workspace = true } +reqwest.workspace = true +tokio.workspace = true diff --git a/crates/dav/src/push/mod.rs b/crates/dav/src/push/mod.rs index 4a1c4eb..fb0824a 100644 --- a/crates/dav/src/push/mod.rs +++ b/crates/dav/src/push/mod.rs @@ -1,5 +1,7 @@ mod prop; +mod push_notifier; mod push_register; pub use prop::*; +pub use push_notifier::push_notifier; pub use push_register::*; diff --git a/crates/dav/src/push/push_notifier.rs b/crates/dav/src/push/push_notifier.rs new file mode 100644 index 0000000..8895c71 --- /dev/null +++ b/crates/dav/src/push/push_notifier.rs @@ -0,0 +1,68 @@ +use crate::xml::multistatus::PropstatElement; +use actix_web::http::StatusCode; +use rustical_store::{CollectionOperation, CollectionOperationType, SubscriptionStore}; +use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot}; +use std::sync::Arc; +use tokio::sync::mpsc::Receiver; +use tracing::{error, info}; + +#[derive(XmlSerialize, Debug)] +struct PushMessageProp { + #[xml(ns = "crate::namespace::NS_DAV")] + topic: String, + #[xml(ns = "crate::namespace::NS_DAV")] + sync_token: Option, +} + +#[derive(XmlSerialize, XmlRootTag, Debug)] +#[xml(root = b"push-message", ns = "crate::namespace::NS_DAVPUSH")] +#[xml(ns_prefix(crate::namespace::NS_DAVPUSH = b"", crate::namespace::NS_DAV = b"D",))] +struct PushMessage { + #[xml(ns = "crate::namespace::NS_DAV")] + propstat: PropstatElement, +} + +pub async fn push_notifier( + mut recv: Receiver, + sub_store: Arc, +) { + while let Some(message) = recv.recv().await { + if let Ok(subscribers) = sub_store.get_subscriptions(&message.topic).await { + let status = match message.r#type { + CollectionOperationType::Object => StatusCode::OK, + CollectionOperationType::Delete => StatusCode::NOT_FOUND, + }; + let push_message = PushMessage { + propstat: PropstatElement { + prop: PushMessageProp { + topic: message.topic, + sync_token: message.sync_token, + }, + status, + }, + }; + let mut output: Vec<_> = b"\n".into(); + let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4); + if let Err(err) = push_message.serialize_root(&mut writer) { + error!("Could not serialize push message: {}", err); + continue; + } + let payload = String::from_utf8(output).unwrap(); + for subscriber in subscribers { + info!( + "Sending a push message to {}: {}", + subscriber.push_resource, payload + ); + let client = reqwest::Client::new(); + if let Err(err) = client + .post(subscriber.push_resource) + .body(payload.to_owned()) + .send() + .await + { + error!("{err}"); + } + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 20617dc..d6e22d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,21 @@ use crate::config::Config; -use actix_web::http::{KeepAlive, StatusCode}; +use actix_web::http::KeepAlive; use actix_web::HttpServer; use anyhow::Result; use app::make_app; use clap::{Parser, Subcommand}; use commands::{cmd_gen_config, cmd_pwhash}; use config::{DataStoreConfig, SqliteDataStoreConfig}; -use rustical_dav::xml::multistatus::PropstatElement; +use rustical_dav::push::push_notifier; use rustical_store::auth::StaticUserStore; use rustical_store::{AddressbookStore, CalendarStore, CollectionOperation, SubscriptionStore}; use rustical_store_sqlite::addressbook_store::SqliteAddressbookStore; use rustical_store_sqlite::calendar_store::SqliteCalendarStore; use rustical_store_sqlite::{create_db_pool, SqliteStore}; -use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot}; use setup_tracing::setup_tracing; use std::fs; use std::sync::Arc; use tokio::sync::mpsc::Receiver; -use tracing::{error, info}; mod app; mod commands; @@ -65,27 +63,6 @@ async fn get_data_stores( }) } -// TODO: Move this code somewhere else :) - -#[derive(XmlSerialize, Debug)] -struct PushMessageProp { - #[xml(ns = "rustical_dav::namespace::NS_DAV")] - topic: String, - #[xml(ns = "rustical_dav::namespace::NS_DAV")] - sync_token: Option, -} - -#[derive(XmlSerialize, XmlRootTag, Debug)] -#[xml(root = b"push-message", ns = "rustical_dav::namespace::NS_DAVPUSH")] -#[xml(ns_prefix( - rustical_dav::namespace::NS_DAVPUSH = b"", - rustical_dav::namespace::NS_DAV = b"D", -))] -struct PushMessage { - #[xml(ns = "rustical_dav::namespace::NS_DAV")] - propstat: PropstatElement, -} - #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); @@ -102,57 +79,10 @@ async fn main() -> Result<()> { setup_tracing(&config.tracing); - let (addr_store, cal_store, subscription_store, mut update_recv) = + let (addr_store, cal_store, subscription_store, update_recv) = get_data_stores(!args.no_migrations, &config.data_store).await?; - let subscription_store_clone = subscription_store.clone(); - tokio::spawn(async move { - let subscription_store = subscription_store_clone.clone(); - while let Some(message) = update_recv.recv().await { - if let Ok(subscribers) = - subscription_store.get_subscriptions(&message.topic).await - { - let status = match message.r#type { - rustical_store::CollectionOperationType::Object => StatusCode::OK, - rustical_store::CollectionOperationType::Delete => { - StatusCode::NOT_FOUND - } - }; - let push_message = PushMessage { - propstat: PropstatElement { - prop: PushMessageProp { - topic: message.topic, - sync_token: message.sync_token, - }, - status, - }, - }; - let mut output: Vec<_> = - b"\n".into(); - let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4); - if let Err(err) = push_message.serialize_root(&mut writer) { - error!("Could not serialize push message: {}", err); - continue; - } - let payload = String::from_utf8(output).unwrap(); - for subscriber in subscribers { - info!( - "Sending a push message to {}: {}", - subscriber.push_resource, payload - ); - let client = reqwest::Client::new(); - if let Err(err) = client - .post(subscriber.push_resource) - .body(payload.to_owned()) - .send() - .await - { - error!("{err}"); - } - } - } - } - }); + tokio::spawn(push_notifier(update_recv, subscription_store.clone())); let user_store = Arc::new(match config.auth { config::AuthConfig::Static(config) => StaticUserStore::new(config),