split store and store_sqlite implementatio into multiple crates

This commit is contained in:
Lennart
2024-10-28 15:34:20 +01:00
parent 53d2ea10e6
commit c013ffa117
13 changed files with 213 additions and 107 deletions

View File

@@ -11,15 +11,10 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
sha2 = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
ical = { workspace = true }
chrono = { workspace = true }
regex = { workspace = true }
lazy_static = { workspace = true }
rstest = { workspace = true }
rstest_reuse = { workspace = true }
thiserror = { workspace = true }
password-auth = { workspace = true }
actix-web = { workspace = true }
@@ -27,6 +22,9 @@ actix-session = { workspace = true }
actix-web-httpauth = { workspace = true }
tracing = { workspace = true }
pbkdf2 = { workspace = true }
rand_core = { workspace = true }
chrono-tz = { workspace = true }
derive_more = { workspace = true }
[dev-dependencies]
rstest = { workspace = true }
rstest_reuse = { workspace = true }

View File

@@ -1,37 +0,0 @@
CREATE TABLE calendars (
principal TEXT NOT NULL,
id TEXT NOT NULL,
synctoken INTEGER DEFAULT 0 NOT NULL,
displayname TEXT,
description TEXT,
"order" INT DEFAULT 0 NOT NULL,
color TEXT,
timezone TEXT NOT NULL,
deleted_at DATETIME,
PRIMARY KEY (principal, id)
);
CREATE TABLE calendarobjects (
principal TEXT NOT NULL,
cal_id TEXT NOT NULL,
id TEXT NOT NULL,
ics TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
deleted_at DATETIME,
PRIMARY KEY (principal, cal_id, id),
FOREIGN KEY (principal, cal_id)
REFERENCES calendars (principal, id) ON DELETE CASCADE
);
CREATE TABLE calendarobjectchangelog (
-- The actual sync token is the SQLite field 'ROWID'
principal TEXT NOT NULL,
cal_id TEXT NOT NULL,
object_id TEXT NOT NULL,
operation INTEGER NOT NULL,
synctoken INTEGER DEFAULT 0 NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (principal, cal_id, created_at),
FOREIGN KEY (principal, cal_id)
REFERENCES calendars (principal, id) ON DELETE CASCADE
);

View File

@@ -1,34 +0,0 @@
CREATE TABLE addressbooks (
principal TEXT NOT NULL,
id TEXT NOT NULL,
synctoken INTEGER DEFAULT 0 NOT NULL,
displayname TEXT,
description TEXT,
deleted_at DATETIME,
PRIMARY KEY (principal, id)
);
CREATE TABLE addressobjects (
principal TEXT NOT NULL,
addressbook_id TEXT NOT NULL,
id TEXT NOT NULL,
vcf TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
deleted_at DATETIME,
PRIMARY KEY (principal, addressbook_id, id),
FOREIGN KEY (principal, addressbook_id)
REFERENCES addressbooks (principal, id) ON DELETE CASCADE
);
CREATE TABLE addressobjectchangelog (
-- The actual sync token is the SQLite field 'ROWID'
principal TEXT NOT NULL,
addressbook_id TEXT NOT NULL,
object_id TEXT NOT NULL,
operation INTEGER NOT NULL,
synctoken INTEGER DEFAULT 0 NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (principal, addressbook_id, created_at),
FOREIGN KEY (principal, addressbook_id)
REFERENCES addressbooks (principal, id) ON DELETE CASCADE
);

View File

@@ -1,4 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Not found")]
NotFound,
@@ -9,21 +10,9 @@ pub enum Error {
#[error("Invalid ics/vcf input: {0}")]
InvalidData(String),
#[error(transparent)]
SqlxError(sqlx::Error),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error(transparent)]
ParserError(#[from] ical::parser::ParserError),
}
impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
match value {
sqlx::Error::RowNotFound => Error::NotFound,
err => Error::SqlxError(err),
}
}
}

View File

@@ -2,7 +2,6 @@ pub mod addressbook_store;
pub mod calendar_store;
pub mod error;
pub mod model;
pub mod sqlite_store;
pub mod timestamp;
pub use error::Error;
pub mod auth;

View File

@@ -1,364 +0,0 @@
use super::{ChangeOperation, SqliteStore};
use crate::Error;
use crate::{
model::{AddressObject, Addressbook},
AddressbookStore,
};
use async_trait::async_trait;
use sqlx::{Sqlite, Transaction};
use tracing::instrument;
#[derive(Debug, Clone)]
struct AddressObjectRow {
id: String,
vcf: String,
}
impl TryFrom<AddressObjectRow> for AddressObject {
type Error = Error;
fn try_from(value: AddressObjectRow) -> Result<Self, Error> {
Self::from_vcf(value.id, value.vcf)
}
}
// Logs an operation to the events
async fn log_object_operation(
tx: &mut Transaction<'_, Sqlite>,
principal: &str,
addressbook_id: &str,
object_id: &str,
operation: ChangeOperation,
) -> Result<(), Error> {
sqlx::query!(
r#"
UPDATE addressbooks
SET synctoken = synctoken + 1
WHERE (principal, id) = (?1, ?2)"#,
principal,
addressbook_id
)
.execute(&mut **tx)
.await?;
sqlx::query!(
r#"
INSERT INTO addressobjectchangelog (principal, addressbook_id, object_id, operation, synctoken)
VALUES (?1, ?2, ?3, ?4, (
SELECT synctoken FROM addressbooks WHERE (principal, id) = (?1, ?2)
))"#,
principal,
addressbook_id,
object_id,
operation
)
.execute(&mut **tx)
.await?;
Ok(())
}
#[async_trait]
impl AddressbookStore for SqliteStore {
#[instrument]
async fn get_addressbook(&self, principal: &str, id: &str) -> Result<Addressbook, Error> {
let addressbook = sqlx::query_as!(
Addressbook,
r#"SELECT principal, id, synctoken, displayname, description, deleted_at
FROM addressbooks
WHERE (principal, id) = (?, ?)"#,
principal,
id
)
.fetch_one(&self.db)
.await?;
Ok(addressbook)
}
#[instrument]
async fn get_addressbooks(&self, principal: &str) -> Result<Vec<Addressbook>, Error> {
let addressbooks = sqlx::query_as!(
Addressbook,
r#"SELECT principal, id, synctoken, displayname, description, deleted_at
FROM addressbooks
WHERE principal = ? AND deleted_at IS NULL"#,
principal
)
.fetch_all(&self.db)
.await?;
Ok(addressbooks)
}
#[instrument]
async fn update_addressbook(
&self,
principal: String,
id: String,
addressbook: Addressbook,
) -> Result<(), Error> {
let result = sqlx::query!(
r#"UPDATE addressbooks SET principal = ?, id = ?, displayname = ?, description = ?
WHERE (principal, id) = (?, ?)"#,
addressbook.principal,
addressbook.id,
addressbook.displayname,
addressbook.description,
principal,
id
)
.execute(&self.db)
.await?;
if result.rows_affected() == 0 {
return Err(Error::NotFound);
}
Ok(())
}
#[instrument]
async fn insert_addressbook(&self, addressbook: Addressbook) -> Result<(), Error> {
sqlx::query!(
r#"INSERT INTO addressbooks (principal, id, displayname, description)
VALUES (?, ?, ?, ?)"#,
addressbook.principal,
addressbook.id,
addressbook.displayname,
addressbook.description,
)
.execute(&self.db)
.await?;
Ok(())
}
#[instrument]
async fn delete_addressbook(
&self,
principal: &str,
addressbook_id: &str,
use_trashbin: bool,
) -> Result<(), Error> {
match use_trashbin {
true => {
sqlx::query!(
r#"UPDATE addressbooks SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#,
principal, addressbook_id
)
.execute(&self.db)
.await?;
}
false => {
sqlx::query!(
r#"DELETE FROM addressbooks WHERE (principal, id) = (?, ?)"#,
principal,
addressbook_id
)
.execute(&self.db)
.await?;
}
};
Ok(())
}
#[instrument]
async fn restore_addressbook(
&self,
principal: &str,
addressbook_id: &str,
) -> Result<(), Error> {
sqlx::query!(
r"UPDATE addressbooks SET deleted_at = NULL WHERE (principal, id) = (?, ?)",
principal,
addressbook_id
)
.execute(&self.db)
.await?;
Ok(())
}
#[instrument]
async fn sync_changes(
&self,
principal: &str,
addressbook_id: &str,
synctoken: i64,
) -> Result<(Vec<AddressObject>, Vec<String>, i64), Error> {
struct Row {
object_id: String,
synctoken: i64,
}
let changes = sqlx::query_as!(
Row,
r#"
SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from addressobjectchangelog
WHERE synctoken > ?
ORDER BY synctoken ASC
"#,
synctoken
)
.fetch_all(&self.db)
.await?;
let mut objects = vec![];
let mut deleted_objects = vec![];
let new_synctoken = changes
.last()
.map(|&Row { synctoken, .. }| synctoken)
.unwrap_or(0);
for Row { object_id, .. } in changes {
match self.get_object(principal, addressbook_id, &object_id).await {
Ok(object) => objects.push(object),
Err(Error::NotFound) => deleted_objects.push(object_id),
Err(err) => return Err(err),
}
}
Ok((objects, deleted_objects, new_synctoken))
}
#[instrument]
async fn get_objects(
&self,
principal: &str,
addressbook_id: &str,
) -> Result<Vec<AddressObject>, Error> {
sqlx::query_as!(
AddressObjectRow,
"SELECT id, vcf FROM addressobjects WHERE principal = ? AND addressbook_id = ? AND deleted_at IS NULL",
principal,
addressbook_id
)
.fetch_all(&self.db)
.await?
.into_iter()
.map(|row| row.try_into())
.collect()
}
#[instrument]
async fn get_object(
&self,
principal: &str,
addressbook_id: &str,
object_id: &str,
) -> Result<AddressObject, Error> {
Ok(sqlx::query_as!(
AddressObjectRow,
"SELECT id, vcf FROM addressobjects WHERE (principal, addressbook_id, id) = (?, ?, ?)",
principal,
addressbook_id,
object_id
)
.fetch_one(&self.db)
.await?
.try_into()?)
}
#[instrument]
async fn put_object(
&self,
principal: String,
addressbook_id: String,
object: AddressObject,
// TODO: implement
overwrite: bool,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
let (object_id, vcf) = (object.get_id(), object.get_vcf());
sqlx::query!(
"REPLACE INTO addressobjects (principal, addressbook_id, id, vcf) VALUES (?, ?, ?, ?)",
principal,
addressbook_id,
object_id,
vcf
)
.execute(&mut *tx)
.await?;
log_object_operation(
&mut tx,
&principal,
&addressbook_id,
object_id,
ChangeOperation::Add,
)
.await?;
tx.commit().await?;
Ok(())
}
#[instrument]
async fn delete_object(
&self,
principal: &str,
addressbook_id: &str,
object_id: &str,
use_trashbin: bool,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
match use_trashbin {
true => {
sqlx::query!(
"UPDATE addressobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)",
principal,
addressbook_id,
object_id
)
.execute(&mut *tx)
.await?;
}
false => {
sqlx::query!(
"DELETE FROM addressobjects WHERE addressbook_id = ? AND id = ?",
addressbook_id,
object_id
)
.execute(&mut *tx)
.await?;
}
};
log_object_operation(
&mut tx,
principal,
addressbook_id,
object_id,
ChangeOperation::Delete,
)
.await?;
tx.commit().await?;
Ok(())
}
#[instrument]
async fn restore_object(
&self,
principal: &str,
addressbook_id: &str,
object_id: &str,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
sqlx::query!(
r#"UPDATE addressobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, addressbook_id, id) = (?, ?, ?)"#,
principal,
addressbook_id,
object_id
)
.execute(&mut *tx)
.await?;
log_object_operation(
&mut tx,
principal,
addressbook_id,
object_id,
ChangeOperation::Add,
)
.await?;
tx.commit().await?;
Ok(())
}
}

View File

@@ -1,351 +0,0 @@
use super::{ChangeOperation, SqliteStore};
use crate::model::object::CalendarObject;
use crate::model::Calendar;
use crate::{CalendarStore, Error};
use anyhow::Result;
use async_trait::async_trait;
use sqlx::Sqlite;
use sqlx::Transaction;
use tracing::instrument;
#[derive(Debug, Clone)]
struct CalendarObjectRow {
id: String,
ics: String,
}
impl TryFrom<CalendarObjectRow> for CalendarObject {
type Error = Error;
fn try_from(value: CalendarObjectRow) -> Result<Self, Error> {
CalendarObject::from_ics(value.id, value.ics)
}
}
// Logs an operation to the events
async fn log_object_operation(
tx: &mut Transaction<'_, Sqlite>,
principal: &str,
cal_id: &str,
object_id: &str,
operation: ChangeOperation,
) -> Result<(), Error> {
sqlx::query!(
r#"
UPDATE calendars
SET synctoken = synctoken + 1
WHERE (principal, id) = (?1, ?2)"#,
principal,
cal_id
)
.execute(&mut **tx)
.await?;
sqlx::query!(
r#"
INSERT INTO calendarobjectchangelog (principal, cal_id, object_id, operation, synctoken)
VALUES (?1, ?2, ?3, ?4, (
SELECT synctoken FROM calendars WHERE (principal, id) = (?1, ?2)
))"#,
principal,
cal_id,
object_id,
operation
)
.execute(&mut **tx)
.await?;
Ok(())
}
#[async_trait]
impl CalendarStore for SqliteStore {
#[instrument]
async fn get_calendar(&self, principal: &str, id: &str) -> Result<Calendar, Error> {
let cal = sqlx::query_as!(
Calendar,
r#"SELECT principal, id, synctoken, "order", displayname, description, color, timezone, deleted_at
FROM calendars
WHERE (principal, id) = (?, ?)"#,
principal,
id
)
.fetch_one(&self.db)
.await?;
Ok(cal)
}
#[instrument]
async fn get_calendars(&self, principal: &str) -> Result<Vec<Calendar>, Error> {
let cals = sqlx::query_as!(
Calendar,
r#"SELECT principal, id, synctoken, displayname, "order", description, color, timezone, deleted_at
FROM calendars
WHERE principal = ? AND deleted_at IS NULL"#,
principal
)
.fetch_all(&self.db)
.await?;
Ok(cals)
}
#[instrument]
async fn insert_calendar(&self, calendar: Calendar) -> Result<(), Error> {
sqlx::query!(
r#"INSERT INTO calendars (principal, id, displayname, description, "order", color, timezone)
VALUES (?, ?, ?, ?, ?, ?, ?)"#,
calendar.principal,
calendar.id,
calendar.displayname,
calendar.description,
calendar.order,
calendar.color,
calendar.timezone
)
.execute(&self.db)
.await?;
Ok(())
}
#[instrument]
async fn update_calendar(
&self,
principal: String,
id: String,
calendar: Calendar,
) -> Result<(), Error> {
let result = sqlx::query!(
r#"UPDATE calendars SET principal = ?, id = ?, displayname = ?, description = ?, "order" = ?, color = ?, timezone = ?
WHERE (principal, id) = (?, ?)"#,
calendar.principal,
calendar.id,
calendar.displayname,
calendar.description,
calendar.order,
calendar.color,
calendar.timezone,
principal,
id
).execute(&self.db).await?;
if result.rows_affected() == 0 {
return Err(Error::NotFound);
}
Ok(())
}
// Does not actually delete the calendar but just disables it
#[instrument]
async fn delete_calendar(
&self,
principal: &str,
id: &str,
use_trashbin: bool,
) -> Result<(), Error> {
match use_trashbin {
true => {
sqlx::query!(
r#"UPDATE calendars SET deleted_at = datetime() WHERE (principal, id) = (?, ?)"#,
principal, id
)
.execute(&self.db)
.await?;
}
false => {
sqlx::query!(
r#"DELETE FROM calendars WHERE (principal, id) = (?, ?)"#,
principal,
id
)
.execute(&self.db)
.await?;
}
};
Ok(())
}
#[instrument]
async fn restore_calendar(&self, principal: &str, id: &str) -> Result<(), Error> {
sqlx::query!(
r"UPDATE calendars SET deleted_at = NULL WHERE (principal, id) = (?, ?)",
principal,
id
)
.execute(&self.db)
.await?;
Ok(())
}
#[instrument]
async fn get_objects(
&self,
principal: &str,
cal_id: &str,
) -> Result<Vec<CalendarObject>, Error> {
sqlx::query_as!(
CalendarObjectRow,
"SELECT id, ics FROM calendarobjects WHERE principal = ? AND cal_id = ? AND deleted_at IS NULL",
principal,
cal_id
)
.fetch_all(&self.db)
.await?
.into_iter()
.map(|row| row.try_into())
.collect()
}
#[instrument]
async fn get_object(
&self,
principal: &str,
cal_id: &str,
object_id: &str,
) -> Result<CalendarObject, Error> {
Ok(sqlx::query_as!(
CalendarObjectRow,
"SELECT id, ics FROM calendarobjects WHERE (principal, cal_id, id) = (?, ?, ?)",
principal,
cal_id,
object_id
)
.fetch_one(&self.db)
.await?
.try_into()?)
}
#[instrument]
async fn put_object(
&self,
principal: String,
cal_id: String,
object: CalendarObject,
// TODO: implement
overwrite: bool,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
let (object_id, ics) = (object.get_id(), object.get_ics());
sqlx::query!(
"REPLACE INTO calendarobjects (principal, cal_id, id, ics) VALUES (?, ?, ?, ?)",
principal,
cal_id,
object_id,
ics
)
.execute(&mut *tx)
.await?;
log_object_operation(
&mut tx,
&principal,
&cal_id,
object_id,
ChangeOperation::Add,
)
.await?;
tx.commit().await?;
Ok(())
}
#[instrument]
async fn delete_object(
&self,
principal: &str,
cal_id: &str,
id: &str,
use_trashbin: bool,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
match use_trashbin {
true => {
sqlx::query!(
"UPDATE calendarobjects SET deleted_at = datetime(), updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)",
principal,
cal_id,
id
)
.execute(&mut *tx)
.await?;
}
false => {
sqlx::query!(
"DELETE FROM calendarobjects WHERE cal_id = ? AND id = ?",
cal_id,
id
)
.execute(&mut *tx)
.await?;
}
};
log_object_operation(&mut tx, principal, cal_id, id, ChangeOperation::Delete).await?;
tx.commit().await?;
Ok(())
}
#[instrument]
async fn restore_object(
&self,
principal: &str,
cal_id: &str,
object_id: &str,
) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
sqlx::query!(
r#"UPDATE calendarobjects SET deleted_at = NULL, updated_at = datetime() WHERE (principal, cal_id, id) = (?, ?, ?)"#,
principal,
cal_id,
object_id
)
.execute(&mut *tx)
.await?;
log_object_operation(&mut tx, principal, cal_id, object_id, ChangeOperation::Add).await?;
tx.commit().await?;
Ok(())
}
#[instrument]
async fn sync_changes(
&self,
principal: &str,
cal_id: &str,
synctoken: i64,
) -> Result<(Vec<CalendarObject>, Vec<String>, i64), Error> {
struct Row {
object_id: String,
synctoken: i64,
}
let changes = sqlx::query_as!(
Row,
r#"
SELECT DISTINCT object_id, max(0, synctoken) as "synctoken!: i64" from calendarobjectchangelog
WHERE synctoken > ?
ORDER BY synctoken ASC
"#,
synctoken
)
.fetch_all(&self.db)
.await?;
let mut objects = vec![];
let mut deleted_objects = vec![];
let new_synctoken = changes
.last()
.map(|&Row { synctoken, .. }| synctoken)
.unwrap_or(0);
for Row { object_id, .. } in changes {
match self.get_object(principal, cal_id, &object_id).await {
Ok(object) => objects.push(object),
Err(Error::NotFound) => deleted_objects.push(object_id),
Err(err) => return Err(err),
}
}
Ok((objects, deleted_objects, new_synctoken))
}
}

View File

@@ -1,44 +0,0 @@
use serde::Serialize;
use sqlx::{sqlite::SqliteConnectOptions, Pool, Sqlite, SqlitePool};
pub mod addressbook_store;
pub mod calendar_store;
#[derive(Debug, Clone, Serialize, sqlx::Type)]
#[serde(rename_all = "kebab-case")]
pub(crate) enum ChangeOperation {
// There's no distinction between Add and Modify
Add,
Delete,
}
#[derive(Debug)]
pub struct SqliteStore {
db: SqlitePool,
}
impl SqliteStore {
pub fn new(db: SqlitePool) -> Self {
Self { db }
}
}
pub async fn create_db_pool(db_url: &str, migrate: bool) -> anyhow::Result<Pool<Sqlite>> {
let db = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(db_url)
.create_if_missing(true),
)
.await?;
if migrate {
println!("Running database migrations");
sqlx::migrate!("./migrations").run(&db).await?;
}
Ok(db)
}
pub async fn create_test_store() -> anyhow::Result<SqliteStore> {
let db = SqlitePool::connect("sqlite::memory:").await?;
sqlx::migrate!("./migrations").run(&db).await?;
Ok(SqliteStore::new(db))
}