diff --git a/supervisor/coresys.py b/supervisor/coresys.py index 958209dfa..daab10f68 100644 --- a/supervisor/coresys.py +++ b/supervisor/coresys.py @@ -9,6 +9,7 @@ from datetime import UTC, datetime, tzinfo from functools import partial import logging import os +import time from types import MappingProxyType from typing import TYPE_CHECKING, Any, Self, TypeVar @@ -655,8 +656,14 @@ class CoreSys: if kwargs: funct = partial(funct, **kwargs) + # Convert datetime to event loop time base + # If datetime is in the past, delay will be negative and call_at will + # schedule the call as soon as possible. + delay = when.timestamp() - time.time() + loop_time = self.loop.time() + delay + return self.loop.call_at( - when.timestamp(), funct, *args, context=self._create_context() + loop_time, funct, *args, context=self._create_context() ) diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index 40bbdfae1..3cce977ec 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -1179,7 +1179,6 @@ async def test_job_scheduled_delay(coresys: CoreSys): async def test_job_scheduled_at(coresys: CoreSys): """Test job that schedules a job to start at a specified time.""" - dt = datetime.now() class TestClass: """Test class.""" @@ -1189,10 +1188,12 @@ async def test_job_scheduled_at(coresys: CoreSys): self.coresys = coresys @Job(name="test_job_scheduled_at_job_scheduler") - async def job_scheduler(self) -> tuple[SupervisorJob, asyncio.TimerHandle]: + async def job_scheduler( + self, scheduled_time: datetime + ) -> tuple[SupervisorJob, asyncio.TimerHandle]: """Schedule a job to run at specified time.""" return self.coresys.jobs.schedule_job( - self.job_task, JobSchedulerOptions(start_at=dt + timedelta(seconds=0.1)) + self.job_task, JobSchedulerOptions(start_at=scheduled_time) ) @Job(name="test_job_scheduled_at_job_task") @@ -1201,29 +1202,28 @@ async def test_job_scheduled_at(coresys: CoreSys): self.coresys.jobs.current.stage = "work" test = TestClass(coresys) - job_started = asyncio.Event() - job_ended = asyncio.Event() + + # Schedule job to run 0.1 seconds from now + scheduled_time = datetime.now() + timedelta(seconds=0.1) + job, _ = await test.job_scheduler(scheduled_time) + started = False + ended = False async def start_listener(evt_job: SupervisorJob): - if evt_job.uuid == job.uuid: - job_started.set() + nonlocal started + started = started or evt_job.uuid == job.uuid async def end_listener(evt_job: SupervisorJob): - if evt_job.uuid == job.uuid: - job_ended.set() + nonlocal ended + ended = ended or evt_job.uuid == job.uuid - async with time_machine.travel(dt): - job, _ = await test.job_scheduler() + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener) + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener) - coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener) - coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener) - - # Advance time to exactly when job should start and wait for completion - async with time_machine.travel(dt + timedelta(seconds=0.1)): - await asyncio.wait_for( - asyncio.gather(job_started.wait(), job_ended.wait()), timeout=1.0 - ) + await asyncio.sleep(0.2) + assert started + assert ended assert job.done assert job.name == "test_job_scheduled_at_job_task" assert job.stage == "work"