Lab
Concurrent Task Scheduler
25 min
Intermediate3 Free Attempts
Instructions
Objective
Build a concurrent task scheduler that safely manages work across multiple worker threads. This exercise tests your understanding of thread safety, producer-consumer patterns, priority queues, and graceful shutdown — all critical topics in backend engineering interviews.
Requirements
1. Thread-Safe Bounded Queue (Producer-Consumer)
- Implement a
TaskQueuewith a configurable maximum capacity (e.g., 100 tasks) submit(task): Blocks if queue is full (back-pressure). Returnsfalseif the scheduler is shutting down.take(): Blocks if queue is empty. ReturnsNone/nullwhen the scheduler is shutting down and the queue is drained.- Must be safe for concurrent access from multiple producer and consumer threads
- Use a mutex + condition variables (not busy-waiting)
2. Priority Scheduling
- Tasks have three priority levels:
HIGH,MEDIUM,LOW - Higher-priority tasks must be dequeued before lower-priority tasks
- Tasks within the same priority level should be processed in FIFO order
- The underlying data structure should support O(log n) insertion and O(log n) extraction
3. Worker Pool
- Create a pool of N worker threads/goroutines that consume tasks from the queue
- Each worker loops: take a task from the queue, execute it, record metrics
- Workers must handle task panics/exceptions gracefully without crashing the entire pool
4. Graceful Shutdown
shutdown(): Stop accepting new tasks, but drain all remaining tasks in the queue- Workers finish their current in-progress task and process all queued tasks before exiting
shutdown()should block the caller until all workers have completed- No tasks should be dropped or left unprocessed after shutdown completes
5. Metrics Collection
- Track these metrics in a thread-safe manner:
- Throughput: total tasks completed
- Average latency: average time from task submission to completion
- Queue depth: current number of tasks waiting in the queue
- Rejection count: number of tasks rejected (submitted after shutdown)
- Provide a
get_metrics()method that returns a snapshot of all metrics
Design Constraints
- You may NOT use language-specific high-level concurrent queues (e.g., Java's
PriorityBlockingQueue, Python'squeue.PriorityQueue, Go's channels as the primary queue). Build the synchronization yourself using mutexes and condition variables to demonstrate understanding. - You MAY use atomic operations for simple counters.
- You MAY use a heap/priority queue data structure for ordering — just wrap it with your own synchronization.
Data Structures
# Task priority levels
class Priority:
HIGH = 0 # lower number = higher priority
MEDIUM = 1
LOW = 2
# A task to be scheduled
class Task:
id: str
priority: Priority
payload: Callable # the function to execute
submitted_at: float # timestamp when submitted
# Metrics snapshot
class Metrics:
total_completed: int
average_latency_ms: float
current_queue_depth: int
total_rejected: int
Expected API
# Create scheduler with 4 workers and max queue size of 100
scheduler = TaskScheduler(num_workers=4, max_queue_size=100)
# Start the worker pool
scheduler.start()
# Submit tasks (blocks if queue is full)
scheduler.submit(Task(id="t1", priority=Priority.HIGH, payload=lambda: process_payment()))
scheduler.submit(Task(id="t2", priority=Priority.LOW, payload=lambda: send_email()))
scheduler.submit(Task(id="t3", priority=Priority.MEDIUM, payload=lambda: update_cache()))
# Processing order: t1 (HIGH) -> t3 (MEDIUM) -> t2 (LOW)
# Get current metrics
metrics = scheduler.get_metrics()
print(f"Completed: {metrics.total_completed}, Avg latency: {metrics.average_latency_ms}ms")
# Graceful shutdown — drains queue, waits for workers to finish
scheduler.shutdown()
Locking Strategy (Explain in Comments)
In your solution, add comments explaining:
- Which lock protects which shared state
- Why your design is deadlock-free (e.g., single lock, or lock ordering)
- Where condition variables are used and what they signal
- How you ensure no tasks are lost during shutdown
Hints
- Use a min-heap for the priority queue, keyed by
(priority, submission_order)to maintain FIFO within the same priority level - A
submission_ordercounter (incrementing integer) breaks ties within the same priority - Use one mutex to protect the queue + a
not_emptycondition variable for consumers + anot_fullcondition variable for producers - For graceful shutdown, set a
shutting_downflag, signal all waiting threads, and have workers check both the flag and queue emptiness - Wrap task execution in try/except (Python), recover (Go), or try/catch (Java) to prevent worker crashes
What to Submit
Your submission should contain 1 file section in the editor below: a complete Python file implementing the TaskScheduler class.
Grading Rubric
Thread Safety: Queue operations (submit, take) are protected by a mutex. Condition variables are used for blocking (not busy-waiting). No race conditions on shared counters. The lock strategy is documented in comments explaining which lock protects which state.25 points
Queue Implementation: Bounded buffer with configurable max capacity. submit() blocks when full (back-pressure using condition variable wait). take() blocks when empty. Uses heap-based data structure for O(log n) operations. Producers and consumers correctly signal each other via condition variables.20 points
Priority Scheduling: Three priority levels (HIGH, MEDIUM, LOW) implemented correctly. Higher-priority tasks are always dequeued before lower-priority ones. FIFO ordering is maintained within the same priority level using a monotonic counter. The heap key correctly combines priority and submission order.20 points
Graceful Shutdown: shutdown() sets a flag to reject new submissions. All queued tasks are drained and processed before workers exit. Workers finish their current in-progress task. shutdown() blocks until all workers complete (thread join). No tasks are dropped or left unprocessed. submit() returns False after shutdown is initiated.15 points
Metrics Collection: Tracks total_completed, average_latency_ms (submission to completion), current_queue_depth, and total_rejected in a thread-safe manner. get_metrics() returns a consistent snapshot. Task exceptions are caught without crashing workers, and failed tasks are still reflected in metrics appropriately.20 points
Checklist
0/8Your Solution
3 free attempts remaining