Add support for CalDAV

This commit is contained in:
2025-08-09 10:08:35 +01:00
parent 5e7a17223e
commit f1496a2194
6 changed files with 330 additions and 140 deletions

View File

@@ -1,35 +1,32 @@
#!/usr/bin/env python3
import argparse
import configparser
from datetime import datetime
import logging
import os
import pytz
from datetime import timedelta
from xml.sax.saxutils import escape as xml_escape
import arrow
import caldav
import foursquare
from dateutil.tz import tzoffset
from ics import Calendar, Event
import simplekml
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
current_dir = os.path.realpath(os.path.dirname(__file__))
CONFIG_FILE = os.path.join(current_dir, "config.ini")
# The kinds of file we can generate:
VALID_KINDS = ["ics", "kml"]
VALID_KINDS = ["ics", "kml", "caldav"]
class FeedGenerator:
fetch = "recent"
def __init__(self, fetch="recent"):
"Loads config, sets up Foursquare API client."
self.fetch = fetch
self.logger = logging.getLogger(self.__class__.__name__)
self._load_config(CONFIG_FILE)
@@ -42,14 +39,18 @@ class FeedGenerator:
try:
config.read_file(open(config_file))
except IOError:
logger.critical("Can't read config file: " + config_file)
self.logger.critical("Can't read config file: " + config_file)
exit()
self.api_access_token = config.get("Foursquare", "AccessToken")
self.ics_filepath = config.get("Local", "IcsFilepath")
self.kml_filepath = config.get("Local", "KmlFilepath")
self.caldav_url = config.get("CalDAV", "url", fallback=None)
self.caldav_username = config.get("CalDAV", "username", fallback=None)
self.caldav_password = config.get("CalDAV", "password", fallback=None)
self.caldav_calendar_name = config.get("CalDAV", "calendar_name", fallback="Foursquare")
def generate(self, kind="ics"):
def generate(self, kind: str = "ics"):
"Call this to fetch the data from the API and generate the file."
if kind not in VALID_KINDS:
raise ValueError("kind should be one of {}.".format(", ".join(VALID_KINDS)))
@@ -60,47 +61,49 @@ class FeedGenerator:
checkins = self._get_recent_checkins()
plural = "" if len(checkins) == 1 else "s"
logger.info("Fetched {} checkin{} from the API".format(len(checkins), plural))
self.logger.info("Fetched {} checkin{} from the API".format(len(checkins), plural))
if kind == "ics":
filepath = self._generate_ics_file(checkins)
elif kind == "kml":
filepath = self._generate_kml_file(checkins)
logger.info("Generated file {}".format(filepath))
self.logger.info("Generated file {}".format(filepath))
exit(0)
def _get_recent_checkins(self):
def _get_recent_checkins(self) -> list:
"Make one request to the API for the most recent checkins."
results = self._get_checkins_from_api()
return results["checkins"]["items"]
def _get_all_checkins(self):
def _get_all_checkins(self) -> list:
"Make multiple requests to the API to get ALL checkins."
offset = 0
checkins = []
# Temporary total:
total_checkins = 9999999999
self.logger.debug("Fetching all checkins...")
# Loop until we have fetched all checkins:
while offset < total_checkins:
results = self._get_checkins_from_api(offset)
self.logger.debug("Got {} checkins from API with offset {}".format(
results["checkins"]["count"], offset
))
if offset == 0:
# First time, set the correct total:
total_checkins = results["checkins"]["count"]
plural = "" if total_checkins == 1 else "s"
logger.debug("{} checkin{} to fetch".format(total_checkins, plural))
self.logger.debug("{} checkin{} to fetch".format(total_checkins, plural))
logger.debug("Fetched {}-{}".format(offset + 1, offset + 250))
self.logger.debug("Fetched {}-{}".format(offset + 1, offset + 250))
checkins += results["checkins"]["items"]
offset += 250
return checkins
def _get_checkins_from_api(self, offset=0):
def _get_checkins_from_api(self, offset: int = 0) -> list:
"""Returns a list of recent checkins for the authenticated user.
Keyword arguments:
@@ -109,11 +112,13 @@ class FeedGenerator:
"""
try:
return self.client.users.checkins(
res = self.client.users.checkins(
params={"limit": 250, "offset": offset, "sort": "newestfirst"}
)
self.logger.debug("Results: {}".format(res))
return res
except foursquare.FoursquareException as err:
logger.error(
self.logger.error(
"Error getting checkins, with offset of {}: {}".format(offset, err)
)
exit(1)
@@ -123,12 +128,12 @@ class FeedGenerator:
try:
user = self.client.users()
except foursquare.FoursquareException as err:
logger.error("Error getting user: {}".format(err))
self.logger.error("Error getting user: {}".format(err))
exit(1)
return user["user"]
def _generate_ics_file(self, checkins):
def _generate_ics_file(self, checkins: list) -> str:
"""Supplied with a list of checkin data from the API, generates
and saves a .ics file.
@@ -144,7 +149,7 @@ class FeedGenerator:
return self.ics_filepath
def _generate_calendar(self, checkins):
def _generate_calendar(self, checkins: list) -> Calendar:
"""Supplied with a list of checkin data from the API, generates
an ics Calendar object and returns it.
@@ -165,19 +170,30 @@ class FeedGenerator:
tz_offset = self._get_checkin_timezone(checkin)
e = Event()
start = arrow.get(checkin["createdAt"]).replace(tzinfo=tz_offset)
e.name = "@ {}".format(venue_name)
e.location = venue_name
e.url = "{}/checkin/{}".format(user["canonicalUrl"], checkin["id"])
e.uid = "{}@foursquare.com".format(checkin["id"])
e.begin = checkin["createdAt"]
e.uid = checkin["id"]
e.begin = start
e.end = start + timedelta(minutes=15)
e.created = start + timedelta(minutes=15)
e.last_modified = start + timedelta(minutes=15)
# Use the 'shout', if any, and the timezone offset in the
# description.
description = []
if "shout" in checkin and len(checkin["shout"]) > 0:
description = [checkin["shout"]]
description.append("Timezone offset: {}".format(tz_offset))
if "beenHere" in checkin and checkin["beenHere"]['lastCheckinExpiredAt'] > 0:
description.append(
"It has been {} days since you last checked in here.".format(
(start - arrow.get(checkin["beenHere"]["lastCheckinExpiredAt"])).days
)
)
if "isMayor" in checkin and checkin["isMayor"]:
description.append("At this time, you were the mayor of this venue!")
e.description = "\n".join(description)
# Use the venue_name and the address, if any, for the location.
@@ -202,6 +218,8 @@ class FeedGenerator:
Keyword arguments:
checkins -- A list of dicts, each one data about a single checkin.
"""
import simplekml
user = self._get_user()
kml = simplekml.Kml()
@@ -249,11 +267,10 @@ class FeedGenerator:
# Foursquare's KML feeds had 'updated' and 'published' elements
# in the Placemark, but I don't *think* those are standard, so:
pnt.timestamp.when = (
datetime.utcfromtimestamp(checkin["createdAt"])
.replace(tzinfo=pytz.utc)
.isoformat()
)
pnt.timestamp.when = arrow.get(
checkin["createdAt"],
tzinfo=self._get_checkin_timezone(checkin),
).isoformat()
# Use the address, if any:
if "location" in checkin["venue"]:
@@ -269,37 +286,79 @@ class FeedGenerator:
return self.kml_filepath
def _get_checkin_timezone(self, checkin):
"""Given a checkin from the API, returns a string representing the
timezone offset of that checkin.
In the API they're given as a number of minutes, positive or negative.
e.g. if offset is 60, this returns '+01:00'
if offset is 0, this returns '+00:00'
if offset is -480, this returns '-08:00'
"""Given a checkin from the API, returns an arrow timezone object
representing the timezone offset of that checkin.
Keyword arguments
checkin -- A dict of data about a single checkin
"""
# In minutes, e.g. 60 or -480
minutes = checkin["timeZoneOffset"]
return tzoffset(None, checkin["timeZoneOffset"] * 60)
# e.g. 1 or -8
hours = minutes / 60
def sync_calendar_to_caldav(self):
"""
Syncs all events from the generated calendar to a CalDAV server.
Uses credentials and URL from the instance config.
"""
# e.g. '01.00' or '-08.00'
if hours >= 0:
offset = "{:05.2f}".format(hours)
symbol = "+"
if self.fetch == "all":
checkins = self._get_all_checkins()
else:
offset = "{:06.2f}".format(hours)
symbol = ""
checkins = self._get_recent_checkins()
calendar = self._generate_calendar(checkins)
# e.g. '+01:00' or '-08.00'
return "{}{}".format(symbol, offset).replace(".", ":")
# Connect to CalDAV server using instance variables
client = caldav.DAVClient(
url=self.caldav_url,
username=self.caldav_username,
password=self.caldav_password,
)
principal = client.principal()
# Try to find the calendar, or create it if it doesn't exist
calendars = principal.calendars()
self.logger.debug("Found {} calendars on the server".format(len(calendars)))
cal = None
for c in calendars:
if c.name.strip() == self.caldav_calendar_name:
cal = c
self.logger.info("Found existing calendar: {}".format(cal.name))
break
if cal is None:
self.logger.info("Creating new calendar: {}".format(self.caldav_calendar_name))
cal = principal.make_calendar(name=self.caldav_calendar_name)
self.logger.debug("Calendar has {} events".format(len(calendar.events)))
# Upload each event from the ics.Calendar object
for event in calendar.events:
# Each event must have a unique UID
# Use the event UID if present, otherwise generate a deterministic one from checkin ID
if not event.uid:
# Try to extract checkin ID from event.url or event.name as fallback
checkin_id = None
if hasattr(event, "url") and event.url:
# URL format: .../checkin/<checkin_id>
parts = event.url.rstrip("/").split("/")
if "checkin" in parts:
idx = parts.index("checkin")
if idx + 1 < len(parts):
checkin_id = parts[idx + 1]
if not checkin_id and hasattr(event, "uid") and event.uid:
# fallback: try to parse from event.uid
if "@" in event.uid:
checkin_id = event.uid.split("@")[0]
if not checkin_id:
# fallback: use event.name
checkin_id = event.name
# Generate a repeatable UID using a namespace and checkin_id
event.uid = "{}@foursquare.com".format(checkin_id)
self.logger.debug("Uploading event with UID: {}".format(event.uid))
cal.add_event(event.serialize())
if __name__ == "__main__":
def main():
"""Main function to parse arguments and run the FeedGenerator."""
# Set up argument parser
parser = argparse.ArgumentParser(
description="Makes a .ics file from your Foursquare/Swarm checkins"
)
@@ -316,7 +375,9 @@ if __name__ == "__main__":
"-k",
"--kind",
action="store",
help="Either ics (default) or kml",
help="Either ics, kml, or caldav. Default is ics.",
choices=VALID_KINDS,
default="ics",
required=False,
type=str,
)
@@ -332,27 +393,26 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.verbose == 1:
logger.setLevel(logging.INFO)
elif args.verbose == 2:
logger.setLevel(logging.DEBUG)
level = logging.DEBUG
else:
logger.setLevel(logging.WARNING)
level = logging.INFO
logging.basicConfig(level=level)
if args.all:
to_fetch = "all"
else:
to_fetch = "recent"
if args.kind:
if args.kind in VALID_KINDS:
kind = args.kind
else:
raise ValueError("kind should be one of {}.".format(", ".join(VALID_KINDS)))
else:
kind = "ics"
generator = FeedGenerator(fetch=to_fetch)
generator.generate(kind=kind)
if args.kind == "caldav":
generator.sync_calendar_to_caldav()
else:
# Generate the requested kind of file
generator.generate(kind=args.kind)
exit(0)
return 0
if __name__ == "__main__":
import sys
sys.exit(main())