I’ve been banging my head against the wall for hours on this.
I have a Calendar model that has a one to many relationship with CalendarEvent model. Each User model is given a one to one relationship with a calendar instance. In relevant part, the CalendarEvent model has a datetime object to represent the start time of the event.
In my user model, I can pull all my events in the default_calendar with self.default_calendar.events.all(), and they all show up. I can list them out and print their datetime.date(). Events are being created, stored, and can be recalled. Cool.
The problem is that len(self.default_calendar.events.filter(datetime__date=date)) always returns zero. It is never not zero. Doesn’t throw an error or exception. It just returns no results, even when date is set with a datetime pulled from the events listed by all().
I’ve checked if it’s a tz issue, and all elements are returning as timezone aware, including the date being used in the filter.
I’m at a loss here.
EDIT:
Here is additional information:
python 3.10.12
Django 5.0.4
Big picture, this part of the project is to track scheduling for a law office and update user’s google calendars. This was working, but I recently moved my development environment from Windows to a Linux VM in order to incorporate Celery.
I’ve traced the issue from data entry to the exact spot in the code where it goes off rails. For brevity, I’m removing code that isn’t relevant to this particular string of operations. Also I’m removing confidential information from the outputs.
UpdateDataFromEfiling.py pulls in scheduling data for cases:
def _get_calendar_events(driver, user, county, case_number, default_official):
events = []
events_trs = driver.find_elements(By.XPATH, "//*[@id='activities']/div/div/div/table/tbody/tr")
for tr in events_trs:
# Extracting the date and time text
date_str = tr.find_element(By.XPATH, "./td[1]").text.strip() # Adjust XPath index
time_str = tr.find_element(By.XPATH, "./td[2]").text.strip() # Adjust XPath index
start_datetime = _combine_date_and_time(date_str, time_str)
if start_datetime < datetime.now(start_datetime.tzinfo): continue
duration = _get_event_length(user, start_datetime, case_number)
if not duration: duration = timedelta(minutes=30)
location_text = _format_location_text(tr.find_element(By.XPATH, "./td[3]").text.strip())
place_id = None # Initialize the place_id variable
# Open the CSV file and read it
with open('government_contacts.csv', mode='r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
# Iterate through each row
for row in reader:
# Check if the location_text matches the court_name
if row['name'].strip().lower() == location_text.strip().lower():
place_id = row['place_id']
break # Exit the loop as we found the match
if not place_id:
with open('government_contacts.csv', mode='r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
location_text = f"{county} County Courthouse"
for row in reader:
if row['name'].strip().lower() == location_text.strip().lower():
place_id = row['place_id']
break # Exit the loop as we found the match
# Create the event dictionary
event = {
'datetime': start_datetime,
'duration': duration,
'location': place_id, # Adjust XPath index
'title': tr.find_element(By.XPATH, "./td[4]").text, # Adjust XPath index
'official': tr.find_element(By.XPATH, "./td[6]").text,
'default_official': default_official,
'event_type' : 'court',
}
events.append(event)
print(f"Event {event['title']} added to event list from efiling")
return events
Output:
Event Telephone scheduling conference added to event list from efiling
The event dictionary is attached to a larger dictionary of case data and passed into my Django CircuitCourt model’s create_or_update_case method:
class AssignedCase(CalendaredModel):
judge = models.ForeignKey(Contact, on_delete=models.SET_NULL, null=True, blank=True, related_name='%(class)s_cases_presiding')
communication_logs = models.ManyToManyField('logs.CommunicationLog', related_name='%(class)s_cases_in', blank=True)
viewed = GenericRelation('oauth_app.User')
court_calendar = models.OneToOneField('calendars.Calendar', null=True, blank=True, on_delete=models.PROTECT, related_name='assigned_case_court_calendar')
default_event_location = models.ForeignKey(Address, null=True, blank=True, on_delete=models.PROTECT)
class CircuitCase(AssignedCase):
case_type = models.CharField(max_length=50, null=True, blank=True)
county = models.CharField(max_length=15)
branch = models.SmallIntegerField(default=1, null=True, blank=True)
case_num = models.CharField(max_length=15)
caption = models.CharField(max_length=100)
force_update = models.BooleanField(default=False)
filing_date = models.DateField(default='1900-01-01')
last_updated = models.DateTimeField(default=timezone.now)
ccap_notes_updated = models.DateField(default='1980-01-01')
@classmethod
def create_or_update_case(cls, case_data, user=None):
with SPD_Base.current_user(user):
# Remove None values
case_data = {key: value for key, value in case_data.items() if value is not None}
# Extract required and optional fields from case_data
# Non-nullable fields
try:
county = case_data['county']
caption = case_data['caption']
case_number = CircuitCase._get_proper_casenumber(case_data['case_num'])
filing_date = case_data['filing_date']
parties = case_data['parties']
except KeyError as e:
raise ValueError(f"Missing required case_data field: {e}")
# Nullable fields with default values
judge = case_data.get('judge', None)
case_type = case_data.get('case_type', None)
branch = case_data.get('branch', 1) # Default branch 1
witnesses = case_data.get('witnesses', [])
charges = case_data.get('charges', [])
ccap_notes = case_data.get('ccap_notes', [])
events = case_data.get('events', [])
# Additional logic for identifying and updating/creating the CircuitCase
lookup_kwargs = {
'county': county,
'case_num': case_number
}
# Creating or updating the CircuitCase
circuit_case, created = cls.objects.update_or_create(
**lookup_kwargs,
defaults={
'judge': judge,
'case_type': case_type,
'branch': branch,
'caption': caption,
'filing_date': filing_date,
'force_update': False,
'last_updated': timezone.now,
# Add other fields as needed
}
)
circuit_case.update_or_create_parties(parties, user=user)
circuit_case.update_or_create_witnesses(witnesses, user=user)
circuit_case.update_or_create_charges(charges, user=user)
circuit_case.update_or_create_ccap_notes(ccap_notes, user=user)
circuit_case.court_calendar.update_or_create_events(events, user=user)
return circuit_case, created
The event dictionary is extracted and passed into the update_or_create_events method of the court_calendar instance…
class Calendar(SPD_Base):
#* Model Fields
title = models.CharField(max_length=100, default="Automated Calendar")
description = models.TextField(null=True, blank=True)
locked = models.BooleanField(default=False)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True)
object_id = models.PositiveIntegerField(null=True)
parent_object = GenericForeignKey('content_type', 'object_id')
google_calendar_id = models.CharField(max_length=255, blank=True, null=True)
#* Constructor
#* Class Methods and Static methods
#* Properties
@property
def assigned_case(self):
return self.assignedcase_calendared_models.first()
@property
def case_caption(self):
from apps.spd_files.models import AssignedCase
if isinstance(self.parent_object, AssignedCase):
assigned_case = self.parent_object
return assigned_case.calendar_caption
else:
raise ValueError("The parent_object is not an instance of AssignedCase")
#* Public Methods
def update_or_create_events(self, events, user=None):
"""
Updates or creates events for the calendar based on a provided list of event details.
This method first identifies and deletes any future events that are not present in the
provided list, assuming such events have been canceled. Then, it filters out events that
already exist in the calendar to prevent duplicates. Finally, it creates new events for
any remaining entries in the provided list.
Parameters:
events (list of dict): A list of dictionaries, each containing details of an event to be updated or created.
user (User, optional): The user responsible for the update or creation of events. Used for logging purposes.
"""
future_events = self.events.filter(datetime__gte=timezone.now())
# Convert incoming events to a comparable form (e.g., a set of tuples)
incoming_event_signatures = {
(event['title'], event['datetime'], event['duration'], event['official'])
for event in events
}
# Identify future events to delete
events_to_delete = [
event for event in future_events
if (event.title, event.datetime, event.duration, event.official.name) not in incoming_event_signatures
]
# Delete identified events
for event in events_to_delete:
self.delete_event(event, user)
# Filter out events that already exist
events_to_create = [
event for event in events
if (event['title'], event['datetime'], event['duration'], event['official']) not in {
(existing_event.title, existing_event.datetime, existing_event.duration, existing_event.official.name)
for existing_event in future_events
}
]
print(f"Creating {len(events_to_create)} events in {self}")
for event in events_to_create:
self.add_event(event, user)
def add_event(self, event_data, appearance_mode=None, user=None):
"""
Adds a new event to the calendar based on provided event details.
This method creates a new event in the calendar using the specified event details.
It handles the creation of related objects, such as event attendees, and performs
any necessary additional setup or notifications related to the new event.
Parameters:
event_data (dict): A dictionary containing the details of the event to be created.
Expected keys include 'title', 'datetime', 'duration', 'location', and 'official'.
user (User, optional): The user responsible for creating the event, used for logging and auditing.
"""
# Adding event to the calendar
from apps.spd_files.models import AssignedCase
is_court_event = isinstance(self.parent_object, AssignedCase)
event_data['calendar'] = self
event = CalendarEvent.create_event(event_data, user=user)
for contact in self.contact_calendared_models.all():
case_party = None
if is_court_event:
associated_case = self.parent_object
for party in associated_case.parties.all():
if contact == party.party: case_party = party
for attorney in party.attorneys.all():
if contact == attorney.attorney: case_party = party
# Create EventAttendees for each contact associated with this calendar
print(f"Adding {contact} as attendee for {event}")
EventAttendees.objects.create(
event=event,
contact=contact,
case_party=case_party,
appearance_mode=appearance_mode
)
# Check for conflicts if the contact has a user account
if hasattr(contact, 'user_account') and contact.user_account:
contact.user_account.check_scheduling_conflicts(event)
if is_court_event:
print(f"Adding {event} to {contact}'s Court Calendar")
contact.add_court_event(event)
else: print("Not adding to court calendar")
contact.refresh_schedule_calendar(event.datetime)
contact.refresh_availability_calendar(event.datetime)
contact.create_travel_events(event.datetime)
contact.update_google_calendars(event.datetime)
Output:
Creating 1 events in Automated Court Calendar for XXXXXXX XXXXXXXXXX
Adding Client Doe, John as attendee for Telephone scheduling conference
Adding Telephone scheduling conference to Client Doe, John’s Court Calendar
Then it refreshes each attendee’s scheduling calendar. Each contact has multiple calendars that are tracked from different sources and then consolidated into a front facing calendar that is then uploaded to the user’s google calendar. This is where it breaks down.
class Contact(CalendaredModel):
#* Model Fields
first_name = models.CharField(max_length=50, null=True, blank=True, default=None)
middle_name = models.CharField(max_length=50, null=True, blank=True, default=None)
last_name_business_name = models.CharField(max_length=200)
birth_year = models.IntegerField(null=True, blank=True, default=None)
birth_month = models.IntegerField(null=True, blank=True, default=None)
birth_day = models.IntegerField(null=True, blank=True, default=None)
death_date = models.DateField(null=True, blank=True)
doc_id = models.CharField(max_length=15, null=True, blank=True, default=None)
bar_number = models.CharField(max_length=15,null=True, blank=True, default=None, unique=True)
ssn = models.CharField(max_length=9, null=True, blank=True)
suffix = models.CharField(max_length=5, null=True, blank=True, default=None)
photo = models.ImageField(null=True, default="default-avatar.png")
viewed = GenericRelation('oauth_app.User')
court_calendar = models.OneToOneField('calendars.Calendar', null=True, blank=True, on_delete=models.PROTECT, related_name='court_calendar_contact')
schedule_calendar = models.OneToOneField('calendars.Calendar', null=True, blank=True, on_delete=models.PROTECT, related_name='schedule_calendar_contact')
availability_calendar = models.OneToOneField('calendars.Calendar', null=True, blank=True, on_delete=models.PROTECT, related_name='avilability_contact')
def refresh_schedule_calendar(self, date):
"""
Refreshes the schedule calendar for a given date by consolidating events from the court and default calendars.
This method first clears existing events from the schedule calendar for the specified date. It then iterates
through events in the court and default calendars, creating corresponding events in the schedule calendar.
Events where the contact is marked as 'not attending' are skipped. The title and notes of events are adjusted
based on the event's appearance mode and type. After populating the schedule calendar, similar events within
the same day are merged to simplify the calendar view.
Parameters:
- date (datetime.date): The date for which the schedule calendar is being refreshed.
Notes:
- The method assumes 'appearance_mode' and 'event_type' are consistently managed within the event data.
- Events are merged based on a predefined logic that considers their titles, types, and timing.
"""
from apps.calendars.models import CalendarEvent
test_court_events = self.court_calendar.events.all()
print(f"There are {len(test_court_events)} events in the Court Calendar. The date looking for is: {date}")
for e in test_court_events:
print(f"Date for {e} is {e.datetime.date()}, Aware: {is_aware(e.datetime)}")
print(f"Date filter is aware? {is_aware(date)}")
self.schedule_calendar.events.filter(datetime__date=date).delete()
calendars = [self.court_calendar, self.default_calendar]
for calendar in calendars:
events = calendar.events.filter(datetime__date=date)
print(f"Adding {len(events)} events from {calendar} to Schedule Calendar")
. . . .
Output:
Added Telephone scheduling conference to contact’s court calendar at 2024-04-18 10:00:00-05:00
There are 1 events in the Court Calendar. The date looking for is: 2024-04-18 10:00:00-05:00
Date for Telephone scheduling conference is 2024-04-18, Aware: True
Date filter is aware? True
Adding 0 events from Court Calendar (Automated) to Schedule Calendar