sqlite_store: Add option to skip broken objects and add validation on start-up

This commit is contained in:
Lennart
2026-01-19 14:48:21 +01:00
parent 0eef4ffabf
commit 15e1509fe3
9 changed files with 156 additions and 113 deletions

View File

@@ -2,6 +2,7 @@ use super::ChangeOperation;
use crate::BEGIN_IMMEDIATE;
use async_trait::async_trait;
use derive_more::derive::Constructor;
use ical::parser::ParserError;
use rustical_ical::AddressObject;
use rustical_store::{
Addressbook, AddressbookStore, CollectionMetadata, CollectionOperation,
@@ -9,7 +10,7 @@ use rustical_store::{
};
use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction};
use tokio::sync::mpsc::Sender;
use tracing::{error_span, instrument, warn};
use tracing::{error, error_span, instrument, warn};
pub mod birthday_calendar;
@@ -18,6 +19,12 @@ struct AddressObjectRow {
id: String,
vcf: String,
}
impl From<AddressObjectRow> for (String, Result<AddressObject, ParserError>) {
fn from(row: AddressObjectRow) -> Self {
let result = AddressObject::from_vcf(row.vcf);
(row.id, result)
}
}
impl TryFrom<AddressObjectRow> for (String, AddressObject) {
type Error = rustical_store::Error;
@@ -31,6 +38,7 @@ impl TryFrom<AddressObjectRow> for (String, AddressObject) {
pub struct SqliteAddressbookStore {
db: SqlitePool,
sender: Sender<CollectionOperation>,
skip_broken: bool,
}
impl SqliteAddressbookStore {
@@ -88,6 +96,36 @@ impl SqliteAddressbookStore {
Ok(())
}
#[allow(clippy::missing_panics_doc)]
pub async fn validate_objects(&self, principal: &str) -> Result<(), Error> {
let mut success = true;
for addressbook in self.get_addressbooks(principal).await? {
for (object_id, res) in Self::_get_objects(&self.db, principal, &addressbook.id).await?
{
if let Err(err) = res {
warn!(
"Invalid address object found at {principal}/{addr_id}/{object_id}.vcf. Error: {err}",
addr_id = addressbook.id
);
success = false;
}
}
}
if !success {
if self.skip_broken {
error!(
"Not all address objects are valid. Since data_store.sqlite.skip_broken=true they will be hidden. You are still advised to manually remove or repair the object. If you need help feel free to open up an issue on GitHub."
);
} else {
error!(
"Not all address objects are valid. Since data_store.sqlite.skip_broken=false this causes a panic. Remove or repair the broken objects manually or set data_store.sqlite.skip_broken=false as a temporary solution to ignore the error. If you need help feel free to open up an issue on GitHub."
);
panic!();
}
}
Ok(())
}
// Logs an operation to an address object
async fn log_object_operation(
tx: &mut Transaction<'_, Sqlite>,
@@ -134,7 +172,7 @@ impl SqliteAddressbookStore {
if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) {
error_span!(
"Error trying to send addressbook update notification:",
err = format!("{err:?}"),
err = format!("{err}"),
);
}
}
@@ -353,8 +391,8 @@ impl SqliteAddressbookStore {
executor: E,
principal: &str,
addressbook_id: &str,
) -> Result<Vec<(String, AddressObject)>, rustical_store::Error> {
sqlx::query_as!(
) -> Result<impl Iterator<Item = (String, Result<AddressObject, ParserError>)>, Error> {
Ok(sqlx::query_as!(
AddressObjectRow,
"SELECT id, vcf FROM addressobjects WHERE principal = ? AND addressbook_id = ? AND deleted_at IS NULL",
principal,
@@ -363,8 +401,8 @@ impl SqliteAddressbookStore {
.fetch_all(executor)
.await.map_err(crate::Error::from)?
.into_iter()
.map(std::convert::TryInto::try_into)
.collect()
.map(Into::into)
)
}
async fn _get_object<'e, E: Executor<'e, Database = Sqlite>>(
@@ -607,7 +645,16 @@ impl AddressbookStore for SqliteAddressbookStore {
principal: &str,
addressbook_id: &str,
) -> Result<Vec<(String, AddressObject)>, rustical_store::Error> {
Self::_get_objects(&self.db, principal, addressbook_id).await
let objects = Self::_get_objects(&self.db, principal, addressbook_id).await?;
if self.skip_broken {
Ok(objects
.filter_map(|(id, res)| Some((id, res.ok()?)))
.collect())
} else {
Ok(objects
.map(|(id, res)| res.map(|obj| (id, obj)))
.collect::<Result<Vec<_>, _>>()?)
}
}
#[instrument]

View File

@@ -3,6 +3,7 @@ use crate::BEGIN_IMMEDIATE;
use async_trait::async_trait;
use chrono::TimeDelta;
use derive_more::derive::Constructor;
use ical::parser::ParserError;
use ical::types::CalDateTime;
use regex::Regex;
use rustical_ical::{CalendarObject, CalendarObjectType};
@@ -13,7 +14,7 @@ use rustical_store::{CollectionOperation, CollectionOperationInfo};
use sqlx::types::chrono::NaiveDateTime;
use sqlx::{Acquire, Executor, Sqlite, SqlitePool, Transaction};
use tokio::sync::mpsc::Sender;
use tracing::{error_span, instrument, warn};
use tracing::{error, error_span, instrument, warn};
#[derive(Debug, Clone)]
struct CalendarObjectRow {
@@ -22,6 +23,23 @@ struct CalendarObjectRow {
uid: String,
}
impl From<CalendarObjectRow> for (String, Result<CalendarObject, ParserError>) {
fn from(row: CalendarObjectRow) -> Self {
let result = CalendarObject::from_ics(row.ics).inspect(|object| {
if object.get_uid() != row.uid {
warn!(
"Calendar object {}.ics: UID={} and row uid={} do not match",
row.id,
object.get_uid(),
row.uid
);
}
});
(row.id, result)
}
}
impl TryFrom<CalendarObjectRow> for (String, CalendarObject) {
type Error = rustical_store::Error;
@@ -92,6 +110,7 @@ impl From<CalendarRow> for Calendar {
pub struct SqliteCalendarStore {
db: SqlitePool,
sender: Sender<CollectionOperation>,
skip_broken: bool,
}
impl SqliteCalendarStore {
@@ -141,11 +160,40 @@ impl SqliteCalendarStore {
if let Err(err) = self.sender.try_send(CollectionOperation { topic, data }) {
error_span!(
"Error trying to send calendar update notification:",
err = format!("{err:?}"),
err = format!("{err}"),
);
}
}
#[allow(clippy::missing_panics_doc)]
pub async fn validate_objects(&self, principal: &str) -> Result<(), Error> {
let mut success = true;
for calendar in self.get_calendars(principal).await? {
for (object_id, res) in Self::_get_objects(&self.db, principal, &calendar.id).await? {
if let Err(err) = res {
warn!(
"Invalid calendar object found at {principal}/{cal_id}/{object_id}.ics. Error: {err}",
cal_id = calendar.id
);
success = false;
}
}
}
if !success {
if self.skip_broken {
error!(
"Not all calendar objects are valid. Since data_store.sqlite.skip_broken=true they will be hidden. You are still advised to manually remove or repair the object. If you need help feel free to open up an issue on GitHub."
);
} else {
error!(
"Not all calendar objects are valid. Since data_store.sqlite.skip_broken=false this causes a panic. Remove or repair the broken objects manually or set data_store.sqlite.skip_broken=false as a temporary solution to ignore the error. If you need help feel free to open up an issue on GitHub."
);
panic!();
}
}
Ok(())
}
/// In the past exports generated objects with invalid VERSION:4.0
/// This repair sets them to VERSION:2.0
#[allow(clippy::missing_panics_doc)]
@@ -456,8 +504,8 @@ impl SqliteCalendarStore {
executor: E,
principal: &str,
cal_id: &str,
) -> Result<Vec<(String, CalendarObject)>, Error> {
sqlx::query_as!(
) -> Result<impl Iterator<Item = (String, Result<CalendarObject, ParserError>)>, Error> {
Ok(sqlx::query_as!(
CalendarObjectRow,
"SELECT id, uid, ics FROM calendarobjects WHERE principal = ? AND cal_id = ? AND deleted_at IS NULL",
principal,
@@ -466,8 +514,8 @@ impl SqliteCalendarStore {
.fetch_all(executor)
.await.map_err(crate::Error::from)?
.into_iter()
.map(std::convert::TryInto::try_into)
.collect()
.map(Into::into)
)
}
async fn _calendar_query<'e, E: Executor<'e, Database = Sqlite>>(
@@ -475,14 +523,14 @@ impl SqliteCalendarStore {
principal: &str,
cal_id: &str,
query: CalendarQuery,
) -> Result<Vec<(String, CalendarObject)>, Error> {
) -> Result<impl Iterator<Item = (String, Result<CalendarObject, ParserError>)>, Error> {
// We extend our query interval by one day in each direction since we really don't want to
// miss any objects because of timezone differences
// I've previously tried NaiveDate::MIN,MAX, but it seems like sqlite cannot handle these
let start = query.time_start.map(|start| start - TimeDelta::days(1));
let end = query.time_end.map(|end| end + TimeDelta::days(1));
sqlx::query_as!(
Ok(sqlx::query_as!(
CalendarObjectRow,
r"SELECT id, uid, ics FROM calendarobjects
WHERE principal = ? AND cal_id = ? AND deleted_at IS NULL
@@ -500,8 +548,7 @@ impl SqliteCalendarStore {
.await
.map_err(crate::Error::from)?
.into_iter()
.map(std::convert::TryInto::try_into)
.collect()
.map(Into::into))
}
async fn _get_object<'e, E: Executor<'e, Database = Sqlite>>(
@@ -641,6 +688,7 @@ impl SqliteCalendarStore {
principal: &str,
cal_id: &str,
synctoken: i64,
skip_broken: bool,
) -> Result<(Vec<(String, CalendarObject)>, Vec<String>, i64), Error> {
struct Row {
object_id: String,
@@ -670,6 +718,8 @@ impl SqliteCalendarStore {
match Self::_get_object(&mut *conn, principal, cal_id, &object_id, false).await {
Ok(object) => objects.push((object_id, object)),
Err(rustical_store::Error::NotFound) => deleted_objects.push(object_id),
// Skip broken object
Err(rustical_store::Error::IcalError(_)) if skip_broken => (),
Err(err) => return Err(err),
}
}
@@ -820,7 +870,16 @@ impl CalendarStore for SqliteCalendarStore {
cal_id: &str,
query: CalendarQuery,
) -> Result<Vec<(String, CalendarObject)>, Error> {
Self::_calendar_query(&self.db, principal, cal_id, query).await
let objects = Self::_calendar_query(&self.db, principal, cal_id, query).await?;
if self.skip_broken {
Ok(objects
.filter_map(|(id, res)| Some((id, res.ok()?)))
.collect())
} else {
Ok(objects
.map(|(id, res)| res.map(|obj| (id, obj)))
.collect::<Result<Vec<_>, _>>()?)
}
}
async fn calendar_metadata(
@@ -851,7 +910,16 @@ impl CalendarStore for SqliteCalendarStore {
principal: &str,
cal_id: &str,
) -> Result<Vec<(String, CalendarObject)>, Error> {
Self::_get_objects(&self.db, principal, cal_id).await
let objects = Self::_get_objects(&self.db, principal, cal_id).await?;
if self.skip_broken {
Ok(objects
.filter_map(|(id, res)| Some((id, res.ok()?)))
.collect())
} else {
Ok(objects
.map(|(id, res)| res.map(|obj| (id, obj)))
.collect::<Result<Vec<_>, _>>()?)
}
}
#[instrument]
@@ -974,7 +1042,7 @@ impl CalendarStore for SqliteCalendarStore {
cal_id: &str,
synctoken: i64,
) -> Result<(Vec<(String, CalendarObject)>, Vec<String>, i64), Error> {
Self::_sync_changes(&self.db, principal, cal_id, synctoken).await
Self::_sync_changes(&self.db, principal, cal_id, synctoken, self.skip_broken).await
}
fn is_read_only(&self, _cal_id: &str) -> bool {

View File

@@ -18,7 +18,7 @@ impl From<sqlx::Error> for Error {
sqlx::Error::RowNotFound => Self::StoreError(rustical_store::Error::NotFound),
sqlx::Error::Database(err) => {
if err.is_unique_violation() {
warn!("{err:?}");
warn!("{err}");
Self::StoreError(rustical_store::Error::AlreadyExists)
} else {
Self::SqlxError(sqlx::Error::Database(err))

View File

@@ -52,8 +52,8 @@ pub async fn test_store_context() -> TestStoreContext {
let db = get_test_db().await;
TestStoreContext {
db: db.clone(),
addr_store: SqliteAddressbookStore::new(db.clone(), send_addr),
cal_store: SqliteCalendarStore::new(db.clone(), send_cal),
addr_store: SqliteAddressbookStore::new(db.clone(), send_addr, false),
cal_store: SqliteCalendarStore::new(db.clone(), send_cal, false),
principal_store: SqlitePrincipalStore::new(db.clone()),
sub_store: SqliteStore::new(db),
}