"""Tests for APScheduler task scheduler. T55: Unit tests for the task scheduler implementation. """ import pytest from unittest.mock import Mock, patch, MagicMock from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger @pytest.mark.unit class TestScheduler: """Test suite for scheduler singleton and decorator.""" def test_get_scheduler_returns_singleton(self): """Test that get_scheduler returns the same instance.""" # Arrange & Act from openrouter_monitor.tasks.scheduler import get_scheduler, _scheduler # First call should create scheduler scheduler1 = get_scheduler() scheduler2 = get_scheduler() # Assert assert scheduler1 is scheduler2 assert isinstance(scheduler1, AsyncIOScheduler) assert scheduler1.timezone.zone == 'UTC' def test_get_scheduler_creates_new_if_none(self): """Test that get_scheduler creates scheduler if None.""" # Arrange from openrouter_monitor.tasks import scheduler as scheduler_module # Reset singleton original_scheduler = scheduler_module._scheduler scheduler_module._scheduler = None try: # Act scheduler = scheduler_module.get_scheduler() # Assert assert scheduler is not None assert isinstance(scheduler, AsyncIOScheduler) finally: # Restore scheduler_module._scheduler = original_scheduler def test_scheduled_job_decorator_registers_job(self): """Test that @scheduled_job decorator registers a job.""" # Arrange from openrouter_monitor.tasks.scheduler import get_scheduler, scheduled_job scheduler = get_scheduler() initial_job_count = len(scheduler.get_jobs()) # Act @scheduled_job(IntervalTrigger(hours=1), id='test_job') async def test_task(): """Test task.""" pass # Assert jobs = scheduler.get_jobs() assert len(jobs) == initial_job_count + 1 # Find our job job = scheduler.get_job('test_job') assert job is not None assert job.func == test_task def test_scheduled_job_with_cron_trigger(self): """Test @scheduled_job with CronTrigger.""" # Arrange from openrouter_monitor.tasks.scheduler import get_scheduler, scheduled_job scheduler = get_scheduler() # Act @scheduled_job(CronTrigger(hour=2, minute=0), id='daily_job') async def daily_task(): """Daily task.""" pass # Assert job = scheduler.get_job('daily_job') assert job is not None assert isinstance(job.trigger, CronTrigger) def test_init_scheduler_starts_scheduler(self): """Test that init_scheduler starts the scheduler.""" # Arrange from openrouter_monitor.tasks.scheduler import init_scheduler, get_scheduler scheduler = get_scheduler() with patch.object(scheduler, 'start') as mock_start: # Act init_scheduler() # Assert mock_start.assert_called_once() def test_shutdown_scheduler_stops_scheduler(self): """Test that shutdown_scheduler stops the scheduler.""" # Arrange from openrouter_monitor.tasks.scheduler import shutdown_scheduler, get_scheduler scheduler = get_scheduler() with patch.object(scheduler, 'shutdown') as mock_shutdown: # Act shutdown_scheduler() # Assert mock_shutdown.assert_called_once_with(wait=True) def test_scheduler_timezone_is_utc(self): """Test that scheduler uses UTC timezone.""" # Arrange & Act from openrouter_monitor.tasks.scheduler import get_scheduler scheduler = get_scheduler() # Assert assert scheduler.timezone.zone == 'UTC' def test_scheduled_job_preserves_function(self): """Test that decorator preserves original function.""" # Arrange from openrouter_monitor.tasks.scheduler import scheduled_job # Act @scheduled_job(IntervalTrigger(minutes=5), id='preserve_test') async def my_task(): """My task docstring.""" return "result" # Assert - function should be returned unchanged assert my_task.__name__ == 'my_task' assert my_task.__doc__ == 'My task docstring.' @pytest.mark.unit class TestSchedulerIntegration: """Integration tests for scheduler lifecycle.""" @pytest.mark.asyncio async def test_scheduler_start_stop_cycle(self): """Test complete scheduler start/stop cycle.""" # Arrange from openrouter_monitor.tasks.scheduler import get_scheduler import asyncio scheduler = get_scheduler() # Act & Assert - should not raise scheduler.start() assert scheduler.running scheduler.shutdown(wait=True) # Give async loop time to process shutdown await asyncio.sleep(0.1) # Note: scheduler.running might still be True in async tests # due to event loop differences, but shutdown should not raise def test_multiple_jobs_can_be_registered(self): """Test that multiple jobs can be registered.""" # Arrange from openrouter_monitor.tasks.scheduler import get_scheduler, scheduled_job from apscheduler.triggers.interval import IntervalTrigger scheduler = get_scheduler() # Act @scheduled_job(IntervalTrigger(hours=1), id='job1') async def job1(): pass @scheduled_job(IntervalTrigger(hours=2), id='job2') async def job2(): pass @scheduled_job(CronTrigger(day_of_week='sun', hour=3), id='job3') async def job3(): pass # Assert jobs = scheduler.get_jobs() job_ids = [job.id for job in jobs] assert 'job1' in job_ids assert 'job2' in job_ids assert 'job3' in job_ids