Design and implement a key-value cache with automatic expiration. The system should allow storing key-value pairs that automatically expire after a specified duration.
This is similar to LeetCode 2622 - Cache With Time Limit.
You'll build this incrementally across multiple parts, with each part extending the previous functionality. [Source: darkinterview.com]
This problem is frequently asked at Netflix under the name "Log Rate Limiter" with a slightly different framing:
Given a time window (e.g., 1 minute), implement a
nameandtimestamp. If the same event name was seen within the time window, suppress it; otherwise, print it.
Interview flow typically follows: [Source: darkinterview.com]
Implement an AutoExpireCache class with the following methods:
class AutoExpireCache:
def __init__(self):
"""Initialize the cache."""
pass
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""
Store a key-value pair that expires after ttl_seconds.
Args:
key: The cache key
value: The value to store (can be any type, including False/None/0)
ttl_seconds: Time-to-live in seconds
Notes:
- If key already exists, override it with new value and TTL
- TTL countdown starts from the moment set() is called
"""
pass
def get(self, key: str) -> Any:
"""
Retrieve a value by key.
Args:
key: The cache key
Returns:
The value if key exists and hasn't expired, None otherwise
Notes:
- Return None for expired keys (not False, as False can be a valid value)
- This is a good time to clean up the expired entry (lazy deletion)
"""
pass
During the interview, be prepared to discuss:
Return value for cache miss: Why return None instead of False? [Source: darkinterview.com]
False can be a valid cached valueNone is more Pythonic for "not found"Should set return anything?
True/False for successError handling: Should we validate inputs?
import time
cache = AutoExpireCache()
# Set a value with 2 second TTL
cache.set("user_session", {"user_id": 123}, 2)
# Immediately get - should return the value
print(cache.get("user_session")) # {"user_id": 123}
# Wait for expiration
time.sleep(3)
# After expiration - should return None
print(cache.get("user_session")) # None
# Test with False as a valid value
cache.set("feature_flag", False, 60)
result = cache.get("feature_flag")
print(result) # False
print(result is None) # False (it's actually the value False, not None)
The simplest approach uses a dictionary with expiration timestamps and lazy deletion (delete when accessed): [Source: darkinterview.com]
import time
from typing import Any
class AutoExpireCache:
def __init__(self):
self.cache = {} # key -> (value, expire_timestamp)
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""O(1) time complexity."""
expire_at = time.time() + ttl_seconds
self.cache[key] = (value, expire_at)
def get(self, key: str) -> Any:
"""O(1) time complexity with lazy deletion."""
if key not in self.cache:
return None
value, expire_at = self.cache[key]
if time.time() > expire_at:
# Lazy deletion: remove expired entry when accessed
del self.cache[key]
return None
return value
Complexity Analysis:
| Method | Time | Space |
|---|---|---|
set | O(1) | O(1) per entry |
get | O(1) | O(1) |
Lazy deletion means we don't actively clean up expired entries. Instead, we only check and remove them when they're accessed via get().
Pros: [Source: darkinterview.com]
Cons:
Interviewer: "Your current implementation has a memory issue. If we set millions of keys that are never accessed again, they'll stay in memory forever even after expiring. How would you handle this?"
Discuss and implement strategies to actively clean up expired entries. [Source: darkinterview.com]
Run a background process that periodically scans for and removes expired entries:
import time
import threading
from typing import Any
class AutoExpireCache:
def __init__(self, cleanup_interval_seconds: int = 60):
self.cache = {} # key -> (value, expire_timestamp)
self.lock = threading.Lock()
# Start background cleanup thread
self.cleanup_interval = cleanup_interval_seconds
self.running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start()
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""O(1) time complexity."""
expire_at = time.time() + ttl_seconds
with self.lock:
self.cache[key] = (value, expire_at)
def get(self, key: str) -> Any:
"""O(1) time complexity with lazy deletion."""
with self.lock:
if key not in self.cache:
return None
value, expire_at = self.cache[key]
if time.time() > expire_at:
.cache[key]
value
() -> :
.running:
time.sleep(.cleanup_interval)
._cleanup_expired()
() -> :
current_time = time.time()
.lock:
expired_keys = [
key key, (_, expire_at) .cache.items()
current_time > expire_at
]
key expired_keys:
.cache[key]
() -> :
.running =
.cleanup_thread.join()
Use a min-heap to track expiration times, allowing O(log N) cleanup:
import time
import heapq
import threading
from typing import Any
class AutoExpireCache:
def __init__(self):
self.cache = {} # key -> (value, expire_timestamp, version)
self.expiry_heap = [] # min-heap of (expire_timestamp, key, version)
self.key_version = {} # key -> current version (for invalidation)
self.lock = threading.Lock()
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""O(log N) due to heap push."""
expire_at = time.time() + ttl_seconds
with self.lock:
# Increment version to invalidate old heap entries
version = self.key_version.get(key, 0) + 1
self.key_version[key] = version
self.cache[key] = (value, expire_at, version)
heapq.heappush(self.expiry_heap, (expire_at, key, version))
def get(self, key: str) -> Any:
"""O(1) average case."""
with self.lock:
if key not in self.cache:
return None
value, expire_at, version = .cache[key]
time.time() > expire_at:
.cache[key]
value
() -> :
current_time = time.time()
removed =
.lock:
.expiry_heap:
expire_at, key, version = .expiry_heap[]
expire_at > current_time:
heapq.heappop(.expiry_heap)
key .cache:
_, _, current_version = .cache[key]
version == current_version:
.cache[key]
removed +=
removed
Why version tracking? [Source: darkinterview.com]
When a key is updated with a new TTL, the old heap entry becomes stale. The version number helps us identify and skip these stale entries during cleanup.
| Approach | Set | Get | Cleanup | Memory Overhead |
|---|---|---|---|---|
| Lazy only | O(1) | O(1) | N/A | None |
| Periodic scan | O(1) | O(1) | O(N) | Thread |
| Min-heap | O(log N) | O(1) | O(E log N) | Heap + versions |
Interviewer: "What if memory is still a concern and we need to enforce a maximum cache size? Implement a bounded cache that evicts the least recently used entries when full."
Modify the cache to have a fixed capacity and use LRU eviction: [Source: darkinterview.com]
class AutoExpireCache:
def __init__(self, capacity: int):
"""
Initialize a bounded cache with LRU eviction.
Args:
capacity: Maximum number of entries the cache can hold
"""
pass
get and set should update the entry's recencyUse Python's OrderedDict which maintains insertion order and allows moving items to the end:
import time
from collections import OrderedDict
from typing import Any
class AutoExpireCache:
def __init__(self, capacity: int):
if capacity <= 0:
raise ValueError("Capacity must be positive")
self.capacity = capacity
self.cache = OrderedDict() # key -> (value, expire_timestamp)
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
"""O(1) amortized time complexity."""
expire_at = time.time() + ttl_seconds
# If key exists, remove it first (to update position)
if key in self.cache:
del self.cache[key]
# Evict LRU if at capacity
while len(self.cache) >= self.capacity:
self.cache.popitem(last=False) # Remove oldest (LRU)
# Add new entry (at end = most recently used)
self.cache[key] = (value, expire_at)
def get(self, key: str) -> Any:
"""O(1) time complexity."""
if key not .cache:
value, expire_at = .cache[key]
time.time() > expire_at:
.cache[key]
.cache.move_to_end(key)
value
() -> :
(.cache)
cache = AutoExpireCache(capacity=3)
cache.set("a", 1, 60)
cache.set("b", 2, 60)
cache.set("c", 3, 60)
print(cache.size()) # 3
# Access 'a' to make it recently used
cache.get("a")
# Add new entry, 'b' is LRU and gets evicted
cache.set("d", 4, 60)
print(cache.get("a")) # 1 (still exists)
print(cache.get("b")) # None (evicted)
print(cache.get("c")) # 3 (still exists)
print(cache.get("d")) # 4 (just added)
If the interviewer wants you to implement LRU from scratch, use a doubly-linked list with a hash map:
import time
from typing import Any
class Node:
def __init__(self, key: str, value: Any, expire_at: float):
self.key = key
self.value = value
self.expire_at = expire_at
self.prev = None
self.next = None
class AutoExpireCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = {} # key -> Node
# Dummy head and tail for easier list manipulation
self.head = Node("", None, 0) # Most recently used
self.tail = Node("", None, 0) # Least recently used
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node: Node) -> None:
"""Remove node from doubly-linked list."""
node.prev.next = node.next
node.next.prev = node.prev
() -> :
node.prev = .head
node. = .head.
.head..prev = node
.head. = node
() -> :
._remove(node)
._add_to_head(node)
() -> :
lru = .tail.prev
lru != .head:
._remove(lru)
.cache[lru.key]
() -> :
expire_at = time.time() + ttl_seconds
key .cache:
node = .cache[key]
node.value = value
node.expire_at = expire_at
._move_to_head(node)
:
(.cache) >= .capacity:
._evict_lru()
node = Node(key, value, expire_at)
.cache[key] = node
._add_to_head(node)
() -> :
key .cache:
node = .cache[key]
time.time() > node.expire_at:
._remove(node)
.cache[key]
._move_to_head(node)
node.value
Complexity Analysis: [Source: darkinterview.com]
| Method | Time | Space |
|---|---|---|
set | O(1) | O(1) per entry |
get | O(1) | O(1) |
import time
from typing import Any
class AutoExpireCache:
def __init__(self):
self.cache = {}
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
self.cache[key] = (value, time.time() + ttl_seconds)
def get(self, key: str) -> Any:
if key not in self.cache:
return None
value, expire_at = self.cache[key]
if time.time() > expire_at:
del self.cache[key]
return None
return value
import time
import threading
from typing import Any
class AutoExpireCache:
def __init__(self, cleanup_interval: int = 60):
self.cache = {}
self.lock = threading.Lock()
self.running = True
self.cleanup_thread = threading.Thread(
target=self._cleanup_loop,
args=(cleanup_interval,),
daemon=True
)
self.cleanup_thread.start()
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
with self.lock:
self.cache[key] = (value, time.time() + ttl_seconds)
def get(self, key: str) -> Any:
with self.lock:
if key not in self.cache:
return None
value, expire_at = self.cache[key]
if time.time() > expire_at:
del self.cache[key]
return None
return value
def _cleanup_loop() -> :
.running:
time.sleep(interval)
._cleanup_expired()
() -> :
current = time.time()
.lock:
expired = [k k, (_, exp) .cache.items() current > exp]
k expired:
.cache[k]
import time
from collections import OrderedDict
from typing import Any
class AutoExpireCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
if key in self.cache:
del self.cache[key]
while len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = (value, time.time() + ttl_seconds)
def get(self, key: str) -> Any:
if key not in self.cache:
return None
value, expire_at = self.cache[key]
if time.time() > expire_at:
del self.cache[key]
return None
self.cache.move_to_end(key)
return value
Why return None for cache miss instead of False?
False can be a valid value stored in the cacheKeyError or return a tuple (found, value)Thread safety considerations
threading.Lock for PythonDistributed cache considerations [Source: darkinterview.com]
TTL Refresh on Access: Should get() extend the TTL?
Bulk Operations: mset(), mget() for multiple keys
Callbacks: Notify when entries expire [Source: darkinterview.com]
Statistics: Hit rate, miss rate, eviction count
| Implementation | set | get | Space | Memory Safety |
|---|---|---|---|---|
| Lazy deletion | O(1) | O(1) | O(N) | ❌ |
| Background cleanup | O(1) | O(1) | O(N) + thread | ✅ |
| Min-heap | O(log N) | O(1) | O(N) | ✅ |
| LRU (OrderedDict) | O(1) | O(1) | O(capacity) | ✅ |
| LRU (manual) | O(1) | O(1) | O(capacity) | ✅ |