Back to Course|Backend Engineer Interviews: Databases, APIs & Distributed Systems Mastery
Lab

Concurrent Task Scheduler

25 min
Intermediate
3 Free Attempts

Instructions

Objective

Build a concurrent task scheduler that safely manages work across multiple worker threads. This exercise tests your understanding of thread safety, producer-consumer patterns, priority queues, and graceful shutdown — all critical topics in backend engineering interviews.

Requirements

1. Thread-Safe Bounded Queue (Producer-Consumer)

  • Implement a TaskQueue with a configurable maximum capacity (e.g., 100 tasks)
  • submit(task): Blocks if queue is full (back-pressure). Returns false if the scheduler is shutting down.
  • take(): Blocks if queue is empty. Returns None/null when the scheduler is shutting down and the queue is drained.
  • Must be safe for concurrent access from multiple producer and consumer threads
  • Use a mutex + condition variables (not busy-waiting)

2. Priority Scheduling

  • Tasks have three priority levels: HIGH, MEDIUM, LOW
  • Higher-priority tasks must be dequeued before lower-priority tasks
  • Tasks within the same priority level should be processed in FIFO order
  • The underlying data structure should support O(log n) insertion and O(log n) extraction

3. Worker Pool

  • Create a pool of N worker threads/goroutines that consume tasks from the queue
  • Each worker loops: take a task from the queue, execute it, record metrics
  • Workers must handle task panics/exceptions gracefully without crashing the entire pool

4. Graceful Shutdown

  • shutdown(): Stop accepting new tasks, but drain all remaining tasks in the queue
  • Workers finish their current in-progress task and process all queued tasks before exiting
  • shutdown() should block the caller until all workers have completed
  • No tasks should be dropped or left unprocessed after shutdown completes

5. Metrics Collection

  • Track these metrics in a thread-safe manner:
    • Throughput: total tasks completed
    • Average latency: average time from task submission to completion
    • Queue depth: current number of tasks waiting in the queue
    • Rejection count: number of tasks rejected (submitted after shutdown)
  • Provide a get_metrics() method that returns a snapshot of all metrics

Design Constraints

  • You may NOT use language-specific high-level concurrent queues (e.g., Java's PriorityBlockingQueue, Python's queue.PriorityQueue, Go's channels as the primary queue). Build the synchronization yourself using mutexes and condition variables to demonstrate understanding.
  • You MAY use atomic operations for simple counters.
  • You MAY use a heap/priority queue data structure for ordering — just wrap it with your own synchronization.

Data Structures

# Task priority levels
class Priority:
    HIGH = 0    # lower number = higher priority
    MEDIUM = 1
    LOW = 2

# A task to be scheduled
class Task:
    id: str
    priority: Priority
    payload: Callable    # the function to execute
    submitted_at: float  # timestamp when submitted

# Metrics snapshot
class Metrics:
    total_completed: int
    average_latency_ms: float
    current_queue_depth: int
    total_rejected: int

Expected API

# Create scheduler with 4 workers and max queue size of 100
scheduler = TaskScheduler(num_workers=4, max_queue_size=100)

# Start the worker pool
scheduler.start()

# Submit tasks (blocks if queue is full)
scheduler.submit(Task(id="t1", priority=Priority.HIGH, payload=lambda: process_payment()))
scheduler.submit(Task(id="t2", priority=Priority.LOW, payload=lambda: send_email()))
scheduler.submit(Task(id="t3", priority=Priority.MEDIUM, payload=lambda: update_cache()))

# Processing order: t1 (HIGH) -> t3 (MEDIUM) -> t2 (LOW)

# Get current metrics
metrics = scheduler.get_metrics()
print(f"Completed: {metrics.total_completed}, Avg latency: {metrics.average_latency_ms}ms")

# Graceful shutdown — drains queue, waits for workers to finish
scheduler.shutdown()

Locking Strategy (Explain in Comments)

In your solution, add comments explaining:

  1. Which lock protects which shared state
  2. Why your design is deadlock-free (e.g., single lock, or lock ordering)
  3. Where condition variables are used and what they signal
  4. How you ensure no tasks are lost during shutdown

Hints

  • Use a min-heap for the priority queue, keyed by (priority, submission_order) to maintain FIFO within the same priority level
  • A submission_order counter (incrementing integer) breaks ties within the same priority
  • Use one mutex to protect the queue + a not_empty condition variable for consumers + a not_full condition variable for producers
  • For graceful shutdown, set a shutting_down flag, signal all waiting threads, and have workers check both the flag and queue emptiness
  • Wrap task execution in try/except (Python), recover (Go), or try/catch (Java) to prevent worker crashes

What to Submit

Your submission should contain 1 file section in the editor below: a complete Python file implementing the TaskScheduler class.


Grading Rubric

Thread Safety: Queue operations (submit, take) are protected by a mutex. Condition variables are used for blocking (not busy-waiting). No race conditions on shared counters. The lock strategy is documented in comments explaining which lock protects which state.25 points
Queue Implementation: Bounded buffer with configurable max capacity. submit() blocks when full (back-pressure using condition variable wait). take() blocks when empty. Uses heap-based data structure for O(log n) operations. Producers and consumers correctly signal each other via condition variables.20 points
Priority Scheduling: Three priority levels (HIGH, MEDIUM, LOW) implemented correctly. Higher-priority tasks are always dequeued before lower-priority ones. FIFO ordering is maintained within the same priority level using a monotonic counter. The heap key correctly combines priority and submission order.20 points
Graceful Shutdown: shutdown() sets a flag to reject new submissions. All queued tasks are drained and processed before workers exit. Workers finish their current in-progress task. shutdown() blocks until all workers complete (thread join). No tasks are dropped or left unprocessed. submit() returns False after shutdown is initiated.15 points
Metrics Collection: Tracks total_completed, average_latency_ms (submission to completion), current_queue_depth, and total_rejected in a thread-safe manner. get_metrics() returns a consistent snapshot. Task exceptions are caught without crashing workers, and failed tasks are still reflected in metrics appropriately.20 points

Checklist

0/8

Your Solution

3 free attempts remaining