Design and implement a time-based key-value data structure that stores multiple values for the same key at different timestamps and retrieves values based on temporal queries. Unlike the LeetCode version, this interview uses real timestamps (Unix epoch seconds as floats or datetime objects) rather than monotonically increasing integers. The main challenge extends beyond basic implementation to production-grade concerns: writing comprehensive tests, mocking time, ensuring timestamp ordering guarantees, and handling concurrent access with appropriate locking strategies.
Implement a TimeMap class that stores key-value pairs with real timestamp values and supports temporal queries.
TimeMap() - Initializes the data structureset(key: str, value: str, timestamp: float) - Stores the key with the given value at the specified timestamp
timestamp is a real Unix timestamp (seconds since epoch) as a floatget(key: str, timestamp: float) -> str - Returns the value where set was called previously with timestamp_prev <= timestamp
timestamp_prev"" if no such value existsfrom datetime import datetime
import time
timemap = TimeMap()
# Store values with real timestamps
t1 = time.time() # e.g., 1678901234.567
timemap.set("user:1:status", "online", t1)
time.sleep(0.1)
t2 = time.time() # e.g., 1678901234.667
# Query at current time - should return "online"
assert timemap.get("user:1:status", t2) == "online"
# Update status
t3 = time.time()
timemap.set("user:1:status", "away", t3)
# Query between t2 and t3 - still returns "online"
assert timemap.get("user:1:status", t2 + 0.01) == "online"
# Query at t3 or later - returns "away"
assert timemap.get("user:1:status", t3) == "away"
# Query before any data - returns empty
assert timemap.get("user:1:status", t1 - 100) == ""
# Non-existent key
assert timemap.get("unknown", t3) == ""
from typing import Dict, List, Tuple
import bisect
class TimeMap:
def __init__(self):
# Map from key to list of (timestamp, value) tuples
# Keep sorted by timestamp for efficient binary search
self.store: Dict[str, List[Tuple[float, str]]] = {}
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Store value at timestamp for the given key.
Time: O(n) worst case due to list insertion, O(log n) for binary search
Space: O(1) per operation
"""
if key not in self.store:
self.store[key] = []
# Insert in sorted order using binary search
# This handles out-of-order timestamp arrivals
entry = (timestamp, value)
bisect.insort(self.store[key], entry)
def get(self, key: str, timestamp: float) -> str:
"""
Get value with largest timestamp <= query timestamp.
Time: O(log n) using binary search
Space: O(1)
"""
if key not in self.store:
return ""
entries = self.store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
class TimeMap:
def __init__(self):
# Store timestamps and values in separate parallel lists
# Useful if values are large objects
self.store: Dict[str, Tuple[List[float], List[str]]] = {}
def set(self, key: str, value: str, timestamp: float) -> None:
if key not in self.store:
self.store[key] = ([], [])
timestamps, values = self.store[key]
# Find insertion position
idx = bisect.bisect_left(timestamps, timestamp)
timestamps.insert(idx, timestamp)
values.insert(idx, value)
def get(self, key: str, timestamp: float) -> str:
if key not in self.store:
return ""
timestamps, values = self.store[key]
# Find rightmost timestamp <= query
idx = bisect.bisect_right(timestamps, timestamp)
if idx == 0:
return ""
return values[idx - 1]
The interviewer will ask: "How would you write tests for this? How would you mock timestamps?" [Source: darkinterview.com]
This is a critical production engineering skill - writing deterministic tests for time-dependent code.
time.time() returns different values on each runMake the time source injectable for testing.
from typing import Callable, Optional
import time
class TimeMap:
def __init__(self, time_provider: Optional[Callable[[], float]] = None):
"""
Args:
time_provider: Function that returns current timestamp.
Defaults to time.time() if not provided.
"""
self.store: Dict[str, List[Tuple[float, str]]] = {}
self.time_provider = time_provider or time.time
def set(self, key: str, value: str, timestamp: Optional[float] = None) -> None:
"""
If timestamp not provided, use current time from time_provider.
"""
if timestamp is None:
timestamp = self.time_provider()
if key not in self.store:
self.store[key] = []
entry = (timestamp, value)
bisect.insort(self.store[key], entry)
def get(self, key: str, timestamp: Optional[float] = None) -> str:
"""
If timestamp not provided, query at current time.
"""
timestamp :
timestamp = .time_provider()
key .store:
entries = .store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
import unittest
from unittest.mock import Mock
class TestTimeMap(unittest.TestCase):
def test_basic_set_and_get(self):
"""Test basic operations with explicit timestamps"""
tm = TimeMap()
tm.set("key1", "value1", 1000.0)
tm.set("key1", "value2", 2000.0)
tm.set("key1", "value3", 3000.0)
assert tm.get("key1", 1000.0) == "value1"
assert tm.get("key1", 1500.0) == "value1"
assert tm.get("key1", 2000.0) == "value2"
assert tm.get("key1", 2500.0) == "value2"
assert tm.get("key1", 3000.0) == "value3"
assert tm.get("key1", 500.0) == ""
def test_with_mock_time_provider(self):
"""Test using dependency injection with mock time"""
mock_time = Mock()
mock_time.return_value = 1000.0
tm = TimeMap(time_provider=mock_time)
# Set at mock time 1000
tm.set("user:status", "online")
assert tm.get() ==
mock_time.return_value =
tm.(, )
tm.get() ==
tm.get(, ) ==
():
tm = TimeMap()
tm.(, , )
tm.(, , )
tm.(, , )
tm.get(, ) ==
tm.get(, ) ==
tm.get(, ) ==
():
tm = TimeMap()
tm.(, , )
tm.(, , )
tm.(, , )
tm.get(, ) ==
tm.get(, ) ==
tm.get(, ) ==
tm.get(, ) ==
():
tm = TimeMap()
tm.get(, ) ==
tm.(, , )
tm.get(, ) ==
tm.get(, ) ==
():
tm = TimeMap()
tm.(, , )
tm.(, , )
result = tm.get(, )
result [, ]
For more comprehensive time mocking: [Source: darkinterview.com]
from freezegun import freeze_time
@freeze_time("2024-01-01 12:00:00")
def test_with_freezegun():
"""Test using freezegun for datetime mocking"""
import datetime
tm = TimeMap(time_provider=lambda: datetime.datetime.now().timestamp())
tm.set("key1", "value1") # Sets at frozen time
with freeze_time("2024-01-01 12:05:00"):
tm.set("key1", "value2")
assert tm.get("key1") == "value2"
The interviewer asks: "How would you ensure timestamps are strictly increasing for the same key?"
This is important for systems where timestamp ordering affects correctness (e.g., event sourcing, logs).
class StrictTimeMap:
def __init__(self):
self.store: Dict[str, List[Tuple[float, str]]] = {}
self.last_timestamp: Dict[str, float] = {}
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Reject non-increasing timestamps for same key.
Raises ValueError if timestamp <= last timestamp for key.
"""
if key in self.last_timestamp:
if timestamp <= self.last_timestamp[key]:
raise ValueError(
f"Timestamp {timestamp} is not greater than "
f"last timestamp {self.last_timestamp[key]} for key {key}"
)
if key not in self.store:
self.store[key] = []
self.store[key].append((timestamp, value)) # Can append since strictly increasing
self.last_timestamp[key] = timestamp
class LogicalTimeMap:
def __init__(self):
self.store: Dict[str, List[Tuple[int, str]]] = {}
self.logical_clock: Dict[str, int] = {}
def set(self, key: str, value: str, wall_clock_time: float) -> int:
"""
Use logical clock that increments on each operation.
Returns the logical timestamp assigned.
"""
if key not in self.logical_clock:
self.logical_clock[key] = 0
# Increment logical clock
self.logical_clock[key] += 1
logical_time = self.logical_clock[key]
if key not in self.store:
self.store[key] = []
# Store both logical and wall clock time
self.store[key].append((logical_time, value))
return logical_time
def get(self, key: str, logical_timestamp: int) -> str:
"""Query by logical timestamp"""
if key not in self.store:
entries = .store[key]
idx = bisect.bisect_right(entries, (logical_timestamp, ()))
idx == :
entries[idx - ][]
class AdjustedTimeMap:
def __init__(self, epsilon: float = 0.001):
"""
epsilon: Minimum time increment added when timestamp collision occurs
"""
self.store: Dict[str, List[Tuple[float, str]]] = {}
self.last_timestamp: Dict[str, float] = {}
self.epsilon = epsilon
def set(self, key: str, value: str, timestamp: float) -> float:
"""
Adjust timestamp if necessary to ensure strict ordering.
Returns the actual timestamp used (may be adjusted).
"""
if key in self.last_timestamp:
if timestamp <= self.last_timestamp[key]:
# Adjust to be slightly after last timestamp
adjusted_timestamp = self.last_timestamp[key] + self.epsilon
timestamp = adjusted_timestamp
if key not in self.store:
self.store[key] = []
self.store[key].append((timestamp, value))
self.last_timestamp[key] = timestamp
return timestamp
def get(self, key: str, timestamp: float) -> :
key .store:
entries = .store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
def test_strict_timestamp_enforcement():
"""Test that non-increasing timestamps are rejected"""
tm = StrictTimeMap()
tm.set("key1", "value1", 1000.0)
tm.set("key1", "value2", 2000.0) # OK: increasing
with pytest.raises(ValueError):
tm.set("key1", "value3", 1500.0) # Error: not increasing
with pytest.raises(ValueError):
tm.set("key1", "value4", 2000.0) # Error: equal to last
def test_timestamp_adjustment():
"""Test automatic timestamp adjustment"""
tm = AdjustedTimeMap(epsilon=0.001)
t1 = tm.set("key1", "value1", 1000.0)
assert t1 == 1000.0
# Try to set with earlier timestamp
t2 = tm.set("key1", "value2", 999.0)
assert t2 == 1000.001 # Adjusted
# Verify ordering
assert tm.get("key1", 1000.0) == "value1"
assert tm.get("key1", 1000.001) == "value2"
The interviewer asks: "How would you handle concurrent access? What locking strategies would you use?" [Source: darkinterview.com]
This tests understanding of thread safety, concurrency primitives, and performance trade-offs.
set(): Multiple threads inserting at same positionimport threading
from typing import Dict, List, Tuple
import bisect
class ThreadSafeTimeMap:
def __init__(self):
self.store: Dict[str, List[Tuple[float, str]]] = {}
self.lock = threading.Lock()
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Thread-safe set with global lock.
Pros: Simple, correct, prevents all races
Cons: Serializes all operations, poor scalability
"""
with self.lock:
if key not in self.store:
self.store[key] = []
entry = (timestamp, value)
bisect.insort(self.store[key], entry)
def get(self, key: str, timestamp: float) -> str:
"""
Thread-safe get with global lock.
"""
with self.lock:
if key not in self.store:
return ""
entries = self.store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
Analysis:
from collections import defaultdict
import threading
class PerKeyLockTimeMap:
def __init__(self):
self.store: Dict[str, List[Tuple[float, str]]] = {}
self.locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
# Separate lock for locks dict itself
self.locks_lock = threading.Lock()
def _get_lock(self, key: str) -> threading.Lock:
"""Get or create lock for key in thread-safe manner"""
# Note: defaultdict is not thread-safe for concurrent access
with self.locks_lock:
return self.locks[key]
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Lock only the specific key being modified.
Pros: Operations on different keys can proceed in parallel
Cons: More complex, memory overhead for locks
"""
lock = self._get_lock(key)
with lock:
if key not in self.store:
self.store[key] = []
entry = (timestamp, value)
bisect.insort(self.store[key], entry)
def get() -> :
lock = ._get_lock(key)
lock:
key .store:
entries = .store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
Analysis: [Source: darkinterview.com]
from threading import RLock
from readerwriterlock import rwlock
class RWLockTimeMap:
def __init__(self):
self.store: Dict[str, List[Tuple[float, str]]] = {}
# Reader-writer lock: multiple readers OR single writer
self.rwlock = rwlock.RWLockFair()
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Acquire write lock for modifications.
Blocks all readers and writers.
"""
with self.rwlock.gen_wlock():
if key not in self.store:
self.store[key] = []
entry = (timestamp, value)
bisect.insort(self.store[key], entry)
def get(self, key: str, timestamp: float) -> str:
"""
Acquire read lock for queries.
Multiple threads can read simultaneously.
"""
with self.rwlock.gen_rlock():
if key not in self.store:
return ""
entries = self.store[key]
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
Analysis:
import threading
from typing import Dict, Tuple, List
class LockFreeTimeMap:
def __init__(self):
# Each key maps to an immutable tuple of (timestamp, value) tuples
# Updates create new tuple, use atomic reference swap
self.store: Dict[str, Tuple[Tuple[float, str], ...]] = {}
self.lock = threading.Lock() # Only for initial key creation
def set(self, key: str, value: str, timestamp: float) -> None:
"""
Copy-on-write: create new version of list for key.
Pros: Reads are lock-free, snapshot isolation
Cons: High memory usage, writes copy entire history
"""
while True:
# Get current version
current = self.store.get(key, ())
# Find insertion point
entries_list = list(current)
idx = bisect.bisect_left(entries_list, (timestamp, ""))
entries_list.insert(idx, (timestamp, value))
new_entries = tuple(entries_list)
# Atomic compare-and-swap (simplified with lock)
# In production, use threading.atomic or lockless data structures
with self.lock:
if self.store.get(key, ()) == current:
self.store[key] = new_entries
() -> :
entries = .store.get(key, ())
entries:
idx = bisect.bisect_right(entries, (timestamp, ()))
idx == :
entries[idx - ][]
Analysis:
| Strategy | Read Scalability | Write Scalability | Memory Overhead | Complexity |
|---|---|---|---|---|
| Global Lock | Low | Low | Minimal | Simple |
| Per-Key Lock | High (different keys) | High (different keys) | Medium (locks per key) | Medium |
| Read-Write Lock | Very High | Low | Low | Medium |
| Lock-Free | Highest | Medium | High (copies) | High |
import concurrent.futures
import threading
def test_concurrent_access():
"""Test thread safety with concurrent operations"""
tm = ThreadSafeTimeMap()
def writer(thread_id):
for i in range(100):
tm.set(f"key{thread_id % 5}", f"value{i}", 1000.0 + i)
def reader(thread_id):
results = []
for i in range(100):
result = tm.get(f"key{thread_id % 5}", 1050.0)
results.append(result)
return results
# Run 10 writers and 10 readers concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
writer_futures = [executor.submit(writer, i) for i in range(10)]
reader_futures = [executor.submit(reader, i) for i in range(10)]
# Wait for completion
concurrent.futures.wait(writer_futures + reader_futures)
# Verify data integrity
for key_id in range(5):
entries = tm.store[f"key{key_id}"]
# Check timestamps are sorted
timestamps = [t for t, v entries]
timestamps == (timestamps),
():
tm = ThreadSafeTimeMap()
barrier = threading.Barrier()
():
barrier.wait()
tm.(, , + thread_id)
threads = [threading.Thread(target=concurrent_set, args=(i,)) i ()]
t threads:
t.start()
t threads:
t.join()
entries = tm.store[]
(entries) == ,
Basic Implementation: [Source: darkinterview.com]
set(): O(n) worst case due to list insertion after binary searchget(): O(log n) for binary searchOptimizations:
deque or custom data structure if append-only (strictly increasing timestamps)