Job Scheduler
Rapua includes a simple built-in job scheduler for running periodic background tasks. The scheduler handles tasks like monthly credit top-ups, cleanup operations, and other recurring maintenance jobs.
Overview
The job scheduler is a lightweight, in-process task scheduler that runs jobs at configurable intervals. It uses goroutines for concurrent job execution and supports graceful shutdown.
Location: /internal/scheduler/job.go
Key Features:
- Concurrent job execution using goroutines
- Context-based cancellation for graceful shutdown
- Configurable scheduling functions
- Built-in logging with slog
- No external dependencies (no cron syntax)
Architecture
Core Components
Scheduler
The Scheduler manages all registered jobs and their lifecycle:
type Scheduler struct {
logger *slog.Logger
jobs []*Job
ctx context.Context
cancel context.CancelFunc
}
Job
Each Job represents a scheduled task:
type Job struct {
Name string // Human-readable job name
Run func(context.Context) error // Function to execute
Next func() time.Time // Function that calculates next run time
}
Built-in Scheduling Functions
The scheduler provides two pre-configured scheduling functions:
NextDaily
Runs the job at midnight every day:
NextDaily = func() time.Time {
now := time.Now()
tomorrow := now.AddDate(0, 0, 1)
return time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, now.Location())
}
Use Cases: Daily cleanup tasks, daily report generation, stale data removal
NextFirstOfMonth
Runs the job at midnight on the first day of each month:
NextFirstOfMonth = func() time.Time {
now := time.Now()
nextMonth := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, now.Location())
// Special case: if already on 1st at midnight, use this month
if now.Day() == 1 && now.Hour() == 0 && now.Minute() == 0 && now.Second() == 0 {
return time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
}
return nextMonth
}
Use Cases: Monthly credit top-ups, monthly billing cycles, monthly reports
How It Works
Job Execution Flow
- Initialization: When a job is added, the scheduler calculates the next run time
- Timer Setup: A timer is created for the duration until the next run
- Waiting: The job goroutine blocks on either:
- The timer firing (time to run)
- The context being cancelled (shutdown signal)
- Execution: When the timer fires, the job’s
Runfunction is called - Rescheduling: After execution, the next run time is calculated and the timer is reset
- Loop: Steps 3-5 repeat until the scheduler is stopped
Concurrency Model
- Each job runs in its own goroutine
- Jobs execute independently and don’t block each other
- All jobs share the same context for coordinated shutdown
- The scheduler uses
context.WithCancelfor graceful termination
Error Handling
Jobs that return errors are logged but don’t prevent rescheduling:
if err := job.Run(s.ctx); err != nil {
slog.Error("Job execution", "job", job.Name, "error", err)
} else {
slog.Info("Job completed successfully", "job", job.Name)
}
This ensures that temporary failures don’t permanently disable scheduled jobs.
Usage
Creating the Scheduler
In cmd/rapua/main.go, the scheduler is initialised during application startup:
// Create the scheduler
jobs := scheduler.NewScheduler(logger)
Registering Jobs
Jobs are registered using the AddJob method:
jobs.AddJob(
"Monthly Credit Top-Up",
monthlyCreditTopupJob.TopUpCredits,
scheduler.NextFirstOfMonth,
)
jobs.AddJob(
"Stale Credit Purchase Cleanup",
staleCreditCleanupService.CleanupStalePurchases,
scheduler.NextDaily,
)
Parameters:
name: Descriptive name for loggingrun: Function that performs the work (must acceptcontext.Contextand returnerror)next: Function that calculates the next run time
Starting the Scheduler
After registering all jobs, start the scheduler:
jobs.Start()
This spawns a goroutine for each registered job.
Stopping the Scheduler
For graceful shutdown, call Stop():
jobs.Stop()
This cancels the shared context, causing all job goroutines to exit cleanly.
Adding a New Job
Follow these steps to add a new scheduled job:
1. Create the Service
Create a service with a method that matches the job signature:
type MyService struct {
// dependencies
}
func (s *MyService) PerformTask(ctx context.Context) error {
// Your job logic here
slog.Info("Running my scheduled task")
// Return error if something goes wrong
if err := someOperation(); err != nil {
return fmt.Errorf("failed to perform task: %w", err)
}
return nil
}
2. Initialise the Service
In cmd/rapua/main.go, initialise your service:
myService := services.NewMyService(transactor, logger)
3. Register the Job
Add your job to the scheduler:
jobs.AddJob(
"My Scheduled Task",
myService.PerformTask,
scheduler.NextDaily, // or NextFirstOfMonth, or custom function
)
4. Custom Scheduling Functions
If you need a custom schedule, create a function that returns time.Time:
// Run every 6 hours
nextSixHours := func() time.Time {
return time.Now().Add(6 * time.Hour)
}
jobs.AddJob(
"Six Hour Task",
myService.PerformTask,
nextSixHours,
)
// Run every Monday at 3 AM
nextMonday3AM := func() time.Time {
now := time.Now()
// Calculate days until next Monday
daysUntilMonday := (7 - int(now.Weekday()) + 1) % 7
if daysUntilMonday == 0 && now.Hour() >= 3 {
daysUntilMonday = 7 // Already past 3 AM on Monday, wait a week
}
nextMonday := now.AddDate(0, 0, daysUntilMonday)
return time.Date(nextMonday.Year(), nextMonday.Month(), nextMonday.Day(), 3, 0, 0, 0, now.Location())
}
jobs.AddJob(
"Weekly Monday Task",
myService.PerformTask,
nextMonday3AM,
)
Current Scheduled Jobs
Monthly Credit Top-Up
Schedule: First of every month at midnight
Function: monthlyCreditTopupJob.TopUpCredits
Purpose: Replenishes users’ free credits to their monthly limit
Details:
- Processes users in batches by credit limit tier
- Uses idempotency checks to prevent duplicate top-ups
- Creates credit adjustment logs for audit trail
- Implements retry logic with exponential backoff
Service: /internal/services/monthly_credit_topup.go
Stale Credit Purchase Cleanup
Schedule: Daily at midnight
Function: staleCreditCleanupService.CleanupStalePurchases
Purpose: Removes abandoned or failed purchase records older than 7 days
Details:
- Deletes purchases in
pendingorfailedstatus - Preserves all
completedpurchases regardless of age - Uses database transactions for safe deletion
- Logs the number of records cleaned up
Service: /internal/services/stale_purchase_cleanup.go
Best Practices
Job Design
- Idempotency: Jobs should be safe to run multiple times with the same result
- Context Awareness: Always respect the context for cancellation
- Error Handling: Return errors but design for failures (job will retry next cycle)
- Timeouts: Consider adding timeouts for long-running operations
- Logging: Use structured logging with relevant context
Example: Idempotent Job
func (s *Service) ProcessRecords(ctx context.Context) error {
// Check if already processed this period
lastRun, err := s.getLastRunTime(ctx)
if err != nil {
return fmt.Errorf("failed to check last run: %w", err)
}
if lastRun.After(time.Now().Add(-24 * time.Hour)) {
slog.Info("Already processed in last 24 hours, skipping")
return nil
}
// Perform work...
// Update last run time
return s.setLastRunTime(ctx, time.Now())
}
Example: Context-Aware Job
func (s *Service) ProcessBatch(ctx context.Context) error {
items, err := s.getItems(ctx)
if err != nil {
return err
}
for _, item := range items {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := s.processItem(ctx, item); err != nil {
slog.Error("Failed to process item", "item", item.ID, "error", err)
// Continue processing other items
}
}
}
return nil
}
Testing
Unit Testing Jobs
Test your job functions independently:
func TestMyService_PerformTask(t *testing.T) {
service := setupService(t)
ctx := context.Background()
err := service.PerformTask(ctx)
require.NoError(t, err)
// Verify expected outcomes
}
Testing Idempotency
func TestMyService_PerformTask_Idempotency(t *testing.T) {
service := setupService(t)
ctx := context.Background()
// Run once
err := service.PerformTask(ctx)
require.NoError(t, err)
// Run again - should be safe
err = service.PerformTask(ctx)
require.NoError(t, err)
// Verify only one set of changes occurred
}
Testing Context Cancellation
func TestMyService_PerformTask_ContextCancellation(t *testing.T) {
service := setupService(t)
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
err := service.PerformTask(ctx)
// Should handle cancellation gracefully
if err != nil {
require.ErrorIs(t, err, context.Canceled)
}
}
Monitoring
Log Output
The scheduler produces structured logs for job lifecycle:
INFO Starting job job=Monthly Credit Top-Up nextRun=2025-11-01T00:00:00+13:00
INFO Executing job job=Monthly Credit Top-Up
INFO Job completed successfully job=Monthly Credit Top-Up
INFO Next run scheduled job=Monthly Credit Top-Up nextRun=2025-12-01T00:00:00+13:00
Error Logs
Failed jobs produce error logs:
ERROR Job execution job=Stale Credit Purchase Cleanup error=failed to connect to database
Troubleshooting
Job Not Running
Check:
- Verify job is registered before
jobs.Start()is called - Check logs for “Starting job” message
- Verify the
Nextfunction returns future timestamps - Ensure the scheduler hasn’t been stopped
Job Running Multiple Times
Causes:
- Job is registered multiple times
- Multiple instances of the application running
Nextfunction returning past timestamps
Solution: Add idempotency checks to your job function
Jobs Not Stopping on Shutdown
Check:
- Verify
jobs.Stop()is called during shutdown - Ensure job functions respect context cancellation
- Add timeouts to long-running operations
Timezone Issues
The scheduler uses the system’s local timezone. For consistent behavior across deployments:
// Use UTC for all scheduling
nextRunUTC := func() time.Time {
now := time.Now().UTC()
tomorrow := now.AddDate(0, 0, 1)
return time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 0, 0, 0, 0, time.UTC)
}