mirror of
https://github.com/lennart-k/rustical.git
synced 2025-12-14 08:12:24 +00:00
Move DAV Push logic to dav crate
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -2748,7 +2748,6 @@ dependencies = [
|
|||||||
"rustical_frontend",
|
"rustical_frontend",
|
||||||
"rustical_store",
|
"rustical_store",
|
||||||
"rustical_store_sqlite",
|
"rustical_store_sqlite",
|
||||||
"rustical_xml",
|
|
||||||
"serde",
|
"serde",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -2823,11 +2822,13 @@ dependencies = [
|
|||||||
"itertools 0.14.0",
|
"itertools 0.14.0",
|
||||||
"log",
|
"log",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
|
"reqwest",
|
||||||
"rustical_store",
|
"rustical_store",
|
||||||
"rustical_xml",
|
"rustical_xml",
|
||||||
"serde",
|
"serde",
|
||||||
"strum",
|
"strum",
|
||||||
"thiserror 2.0.11",
|
"thiserror 2.0.11",
|
||||||
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-actix-web",
|
"tracing-actix-web",
|
||||||
]
|
]
|
||||||
|
|||||||
13
Cargo.toml
13
Cargo.toml
@@ -100,6 +100,12 @@ quote = "1.0"
|
|||||||
proc-macro2 = "1.0"
|
proc-macro2 = "1.0"
|
||||||
heck = "0.5"
|
heck = "0.5"
|
||||||
darling = "0.20"
|
darling = "0.20"
|
||||||
|
reqwest = { version = "0.12", features = [
|
||||||
|
"rustls-tls",
|
||||||
|
"charset",
|
||||||
|
"http2",
|
||||||
|
], default-features = false }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rustical_store = { workspace = true }
|
rustical_store = { workspace = true }
|
||||||
rustical_store_sqlite = { workspace = true }
|
rustical_store_sqlite = { workspace = true }
|
||||||
@@ -134,11 +140,6 @@ rpassword.workspace = true
|
|||||||
argon2.workspace = true
|
argon2.workspace = true
|
||||||
pbkdf2.workspace = true
|
pbkdf2.workspace = true
|
||||||
password-hash.workspace = true
|
password-hash.workspace = true
|
||||||
reqwest = { version = "0.12", features = [
|
reqwest.workspace = true
|
||||||
"rustls-tls",
|
|
||||||
"charset",
|
|
||||||
"http2",
|
|
||||||
], default-features = false }
|
|
||||||
rustical_xml.workspace = true
|
|
||||||
rustical_dav.workspace = true
|
rustical_dav.workspace = true
|
||||||
quick-xml.workspace = true
|
quick-xml.workspace = true
|
||||||
|
|||||||
@@ -21,3 +21,5 @@ log = { workspace = true }
|
|||||||
derive_more = { workspace = true }
|
derive_more = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tracing-actix-web = { workspace = true }
|
tracing-actix-web = { workspace = true }
|
||||||
|
reqwest.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
mod prop;
|
mod prop;
|
||||||
|
mod push_notifier;
|
||||||
mod push_register;
|
mod push_register;
|
||||||
|
|
||||||
pub use prop::*;
|
pub use prop::*;
|
||||||
|
pub use push_notifier::push_notifier;
|
||||||
pub use push_register::*;
|
pub use push_register::*;
|
||||||
|
|||||||
68
crates/dav/src/push/push_notifier.rs
Normal file
68
crates/dav/src/push/push_notifier.rs
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
use crate::xml::multistatus::PropstatElement;
|
||||||
|
use actix_web::http::StatusCode;
|
||||||
|
use rustical_store::{CollectionOperation, CollectionOperationType, SubscriptionStore};
|
||||||
|
use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
#[derive(XmlSerialize, Debug)]
|
||||||
|
struct PushMessageProp {
|
||||||
|
#[xml(ns = "crate::namespace::NS_DAV")]
|
||||||
|
topic: String,
|
||||||
|
#[xml(ns = "crate::namespace::NS_DAV")]
|
||||||
|
sync_token: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(XmlSerialize, XmlRootTag, Debug)]
|
||||||
|
#[xml(root = b"push-message", ns = "crate::namespace::NS_DAVPUSH")]
|
||||||
|
#[xml(ns_prefix(crate::namespace::NS_DAVPUSH = b"", crate::namespace::NS_DAV = b"D",))]
|
||||||
|
struct PushMessage {
|
||||||
|
#[xml(ns = "crate::namespace::NS_DAV")]
|
||||||
|
propstat: PropstatElement<PushMessageProp>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn push_notifier(
|
||||||
|
mut recv: Receiver<CollectionOperation>,
|
||||||
|
sub_store: Arc<impl SubscriptionStore>,
|
||||||
|
) {
|
||||||
|
while let Some(message) = recv.recv().await {
|
||||||
|
if let Ok(subscribers) = sub_store.get_subscriptions(&message.topic).await {
|
||||||
|
let status = match message.r#type {
|
||||||
|
CollectionOperationType::Object => StatusCode::OK,
|
||||||
|
CollectionOperationType::Delete => StatusCode::NOT_FOUND,
|
||||||
|
};
|
||||||
|
let push_message = PushMessage {
|
||||||
|
propstat: PropstatElement {
|
||||||
|
prop: PushMessageProp {
|
||||||
|
topic: message.topic,
|
||||||
|
sync_token: message.sync_token,
|
||||||
|
},
|
||||||
|
status,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let mut output: Vec<_> = b"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n".into();
|
||||||
|
let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4);
|
||||||
|
if let Err(err) = push_message.serialize_root(&mut writer) {
|
||||||
|
error!("Could not serialize push message: {}", err);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let payload = String::from_utf8(output).unwrap();
|
||||||
|
for subscriber in subscribers {
|
||||||
|
info!(
|
||||||
|
"Sending a push message to {}: {}",
|
||||||
|
subscriber.push_resource, payload
|
||||||
|
);
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
if let Err(err) = client
|
||||||
|
.post(subscriber.push_resource)
|
||||||
|
.body(payload.to_owned())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!("{err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
78
src/main.rs
78
src/main.rs
@@ -1,23 +1,21 @@
|
|||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use actix_web::http::{KeepAlive, StatusCode};
|
use actix_web::http::KeepAlive;
|
||||||
use actix_web::HttpServer;
|
use actix_web::HttpServer;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use app::make_app;
|
use app::make_app;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use commands::{cmd_gen_config, cmd_pwhash};
|
use commands::{cmd_gen_config, cmd_pwhash};
|
||||||
use config::{DataStoreConfig, SqliteDataStoreConfig};
|
use config::{DataStoreConfig, SqliteDataStoreConfig};
|
||||||
use rustical_dav::xml::multistatus::PropstatElement;
|
use rustical_dav::push::push_notifier;
|
||||||
use rustical_store::auth::StaticUserStore;
|
use rustical_store::auth::StaticUserStore;
|
||||||
use rustical_store::{AddressbookStore, CalendarStore, CollectionOperation, SubscriptionStore};
|
use rustical_store::{AddressbookStore, CalendarStore, CollectionOperation, SubscriptionStore};
|
||||||
use rustical_store_sqlite::addressbook_store::SqliteAddressbookStore;
|
use rustical_store_sqlite::addressbook_store::SqliteAddressbookStore;
|
||||||
use rustical_store_sqlite::calendar_store::SqliteCalendarStore;
|
use rustical_store_sqlite::calendar_store::SqliteCalendarStore;
|
||||||
use rustical_store_sqlite::{create_db_pool, SqliteStore};
|
use rustical_store_sqlite::{create_db_pool, SqliteStore};
|
||||||
use rustical_xml::{XmlRootTag, XmlSerialize, XmlSerializeRoot};
|
|
||||||
use setup_tracing::setup_tracing;
|
use setup_tracing::setup_tracing;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tracing::{error, info};
|
|
||||||
|
|
||||||
mod app;
|
mod app;
|
||||||
mod commands;
|
mod commands;
|
||||||
@@ -65,27 +63,6 @@ async fn get_data_stores(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Move this code somewhere else :)
|
|
||||||
|
|
||||||
#[derive(XmlSerialize, Debug)]
|
|
||||||
struct PushMessageProp {
|
|
||||||
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
|
|
||||||
topic: String,
|
|
||||||
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
|
|
||||||
sync_token: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(XmlSerialize, XmlRootTag, Debug)]
|
|
||||||
#[xml(root = b"push-message", ns = "rustical_dav::namespace::NS_DAVPUSH")]
|
|
||||||
#[xml(ns_prefix(
|
|
||||||
rustical_dav::namespace::NS_DAVPUSH = b"",
|
|
||||||
rustical_dav::namespace::NS_DAV = b"D",
|
|
||||||
))]
|
|
||||||
struct PushMessage {
|
|
||||||
#[xml(ns = "rustical_dav::namespace::NS_DAV")]
|
|
||||||
propstat: PropstatElement<PushMessageProp>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
@@ -102,57 +79,10 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
setup_tracing(&config.tracing);
|
setup_tracing(&config.tracing);
|
||||||
|
|
||||||
let (addr_store, cal_store, subscription_store, mut update_recv) =
|
let (addr_store, cal_store, subscription_store, update_recv) =
|
||||||
get_data_stores(!args.no_migrations, &config.data_store).await?;
|
get_data_stores(!args.no_migrations, &config.data_store).await?;
|
||||||
|
|
||||||
let subscription_store_clone = subscription_store.clone();
|
tokio::spawn(push_notifier(update_recv, subscription_store.clone()));
|
||||||
tokio::spawn(async move {
|
|
||||||
let subscription_store = subscription_store_clone.clone();
|
|
||||||
while let Some(message) = update_recv.recv().await {
|
|
||||||
if let Ok(subscribers) =
|
|
||||||
subscription_store.get_subscriptions(&message.topic).await
|
|
||||||
{
|
|
||||||
let status = match message.r#type {
|
|
||||||
rustical_store::CollectionOperationType::Object => StatusCode::OK,
|
|
||||||
rustical_store::CollectionOperationType::Delete => {
|
|
||||||
StatusCode::NOT_FOUND
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let push_message = PushMessage {
|
|
||||||
propstat: PropstatElement {
|
|
||||||
prop: PushMessageProp {
|
|
||||||
topic: message.topic,
|
|
||||||
sync_token: message.sync_token,
|
|
||||||
},
|
|
||||||
status,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
let mut output: Vec<_> =
|
|
||||||
b"<?xml version=\"1.0\" encoding=\"utf-8\"?>\n".into();
|
|
||||||
let mut writer = quick_xml::Writer::new_with_indent(&mut output, b' ', 4);
|
|
||||||
if let Err(err) = push_message.serialize_root(&mut writer) {
|
|
||||||
error!("Could not serialize push message: {}", err);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let payload = String::from_utf8(output).unwrap();
|
|
||||||
for subscriber in subscribers {
|
|
||||||
info!(
|
|
||||||
"Sending a push message to {}: {}",
|
|
||||||
subscriber.push_resource, payload
|
|
||||||
);
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
if let Err(err) = client
|
|
||||||
.post(subscriber.push_resource)
|
|
||||||
.body(payload.to_owned())
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("{err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let user_store = Arc::new(match config.auth {
|
let user_store = Arc::new(match config.auth {
|
||||||
config::AuthConfig::Static(config) => StaticUserStore::new(config),
|
config::AuthConfig::Static(config) => StaticUserStore::new(config),
|
||||||
|
|||||||
Reference in New Issue
Block a user