4. Heaps / Priority Queues

Table of Contents

A. General Introduction

Heaps are concrete data structures, whereas priority queues are abstract data structures. An abstract data structure determines the interface, while a concrete data structure defines the implementation.1

https://en.wikipedia.org/wiki/Heap_%28data_structure%29
https://en.wikipedia.org/wiki/Heap_%28data_structure%29

In a heap tree, the value in a node is always smaller than both of its children. This is called the heap property.

Generally, heaps can be of two types

  1. Min-heap: The smallest element is always at the root of the tree.
  2. Max-heap: The largest element is always at the root of the tree.

Time Complexity

Operation Worst-case Time complexity
Build a binary heap (heapify) O(n)
Insert an element O(log n)
Delete an element O(log n)
Find the max/min O(1)
Find the kth largest element (max-heap) O(k*log(n))
Heap sort O(n*log(n))

B. Leetcode Problems

1046. Last Stone Weight

You are given an array of integers stones where stones[i] is the weight of the ith stone.

We are playing a game with the stones. On each turn, we choose the heaviest two stones and smash them together. Suppose the heaviest two stones have weights x and y with x <= y. The result of this smash is:

If x == y, both stones are destroyed, and If x != y, the stone of weight x is destroyed, and the stone of weight y has new weight y - x. At the end of the game, there is at most one stone left.

Return the weight of the last remaining stone. If there are no stones left, return 0.

Example:

Input: stones = [2,7,4,1,8,1]
Output: 1
Explanation:
We combine 7 and 8 to get 1 so the array converts to [2,4,1,1,1],
we combine 2 and 4 to get 2 so the array converts to [2,1,1,1],
we combine 2 and 1 to get 1 so the array converts to [1,1,1],
we combine 1 and 1 to get 0 so the array converts to [1] then that’s the value of the last stone.

Key Ideas:

  • the default implementation in heapq is min-heap
    • so the values are made negative to make them max-heap
  • When pushing and popping to the heap, the value is negated
Code


import heapq

class Solution:
    def lastStoneWeight(self, stones: List[int]) -> int:

        stones = [-1*stone for stone in stones]
        heapq.heapify(stones)

        while len(stones) > 1:

            s1 = -1 * heapq.heappop(stones)
            s2 = -1 * heapq.heappop(stones) # s2 <= s1

            if s1 != s2:
                new_stone = -1*(s1 - s2)
                heapq.heappush(stones, new_stone)

        stones.append(0)

        return abs(stones[0])

Time Complexity: O(nlog(n))

703. Kth Largest Element in a Stream

Design a class to find the kth largest element in a stream. Note that it is the kth largest element in the sorted order, not the kth distinct element.

Implement KthLargest class:

  • KthLargest(int k, int[] nums) Initializes the object with the integer k and the stream of integers nums.
  • int add(int val) Appends the integer val to the stream and returns the element representing the kth largest element in the stream.

Example:

Input:
[“KthLargest”, “add”, “add”, “add”, “add”, “add”]
[[3, [4, 5, 8, 2]], [3], [5], [10], [9], [4]]

Output:
[null, 4, 5, 5, 8, 8]

Explanation:
KthLargest kthLargest = new KthLargest(3, [4, 5, 8, 2]);
kthLargest.add(3); // return 4
kthLargest.add(5); // return 5
kthLargest.add(10); // return 5
kthLargest.add(9); // return 8
kthLargest.add(4); // return 8

Key Ideas:

  • Always maintain the list size of k
    • use heappop to pop the smallest (n-k) elements
  • when inserting, pop the last element if the size is greater than k
Code


import heapq

class KthLargest:

    def __init__(self, k: int, nums: List[int]):

        self.nums = nums
        self.k = k

        heapq.heapify(self.nums) # O(n)
        while len(self.nums) > self.k:
            heapq.heappop(self.nums)

    def add(self, val: int) -> int:

        heapq.heappush(self.nums, val)
        if len(self.nums) > self.k:
            heapq.heappop(self.nums)

        return self.nums[0] # using a min-heap

Time complexity:

  • The constructor’s time complexity would be O(nlog(n) + (n-k)log(n)) -> O(nlog(n))
  • The add’s time complexity would be O(log(n)), as the heap is always maintained at size k

973. K Closest Points to Origin

Given an array of points where points[i] = [xi, yi] represents a point on the X-Y plane and an integer k, return the k closest points to the origin (0, 0).

The distance between two points on the X-Y plane is the Euclidean distance (i.e., √(x1 - x2)^2 + (y1 - y2)^2).

You may return the answer in any order. The answer is guaranteed to be unique (except for the order that it is in).

Example:

https://leetcode.com/problems/k-closest-points-to-origin/
https://leetcode.com/problems/k-closest-points-to-origin/

Input: points = [[1,3],[-2,2]], k = 1
Output: [[-2,2]]
Explanation:
The distance between (1, 3) and the origin is sqrt(10).
The distance between (-2, 2) and the origin is sqrt(8).
Since the distance between (-2, 2) and the origin is less than the distance between (1, 3) and the origin, (-2, 2) is closer to the origin.

Key Ideas

  • compute the distance of each point to the origin and store it in a list
  • To get the k-nearest points, I tried two different methods and both seem to work
    1. Method 1: Heaps
      • Use a min-heap to store the k-nearest points
      • pop k elements from the heap and return the points
    2. Method 2: Sort
      • Sort the list of distances and return the first k elements
Code - Heap method


import heapq

class Solution:
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:

        # 1. Compute the euclidean distance from the origin
        # 2. heap push all points -> nlogn
        # 3. pop `k` elements from the heap -> klogn

        distances = []
        compute_distance = lambda x,y: (x**2 + y**2)**(0.5)

        for x,y in points:
            dist = compute_distance(x,y)
            heapq.heappush((dist, [x,y]))

        k_nearest = []

        for _ in range(k):
            _, pt = heapq.heappop(distances)
            k_nearest.append(pt)

        return k_nearest

Time complexity: O((n+k)log(n))

Code - Sort method

import heapq

class Solution:
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:

        distances = []
        compute_distance = lambda x,y: (x**2 + y**2)**(0.5)

        for x,y in points:
            dist = compute_distance(x,y)
            distances.append((dist, [x, y]))

        distances = sorted(distances, key=lambda x: x[0]) # nlogn
        k_nearest = []

        for i in range(k): # k
            _, pt = distances[i]
            k_nearest.append(pt)

        return k_nearest

Time complexity: O(nlog(n))

355. Design Twitter

Design a simplified version of Twitter where users can post tweets, follow/unfollow another user, and is able to see the 10 most recent tweets in the user’s news feed.

Implement the Twitter class:

  • Twitter() Initializes your twitter object.
  • void postTweet(int userId, int tweetId) Composes a new tweet with ID tweetId by the user userId. Each call to this function will be made with a unique tweetId.
  • List<Integer> getNewsFeed(int userId) Retrieves the 10 most recent tweet IDs in the user’s news feed. Each item in the news feed must be posted by users who the user followed or by the user themself. Tweets must be ordered from most recent to least recent.
  • void follow(int followerId, int followeeId) The user with ID followerId started following the user with ID followeeId.
  • void unfollow(int followerId, int followeeId) The user with ID followerId started unfollowing the user with ID followeeId.

Example:

Input > [“Twitter”, “postTweet”, “getNewsFeed”, “follow”, “postTweet”, “getNewsFeed”, “unfollow”, “getNewsFeed”]
[[], [1, 5], [1], [1, 2], [2, 6], [1], [1, 2], [1]]

Output [null, null, [5], null, null, [6, 5], null, [5]]

Explanation

Twitter twitter = new Twitter();
twitter.postTweet(1, 5);
// User 1 posts a new tweet (id = 5). twitter.getNewsFeed(1); // User 1’s news feed should return a list with 1 tweet id -> [5]. return [5]
twitter.follow(1, 2); // User 1 follows user 2.
twitter.postTweet(2, 6); // User 2 posts a new tweet (id = 6).
twitter.getNewsFeed(1); // User 1’s news feed should return a list with 2 tweet ids -> [6, 5]. Tweet id 6 should precede tweet id 5 because it is posted after tweet id 5.
twitter.unfollow(1, 2); // User 1 unfollows user 2.
twitter.getNewsFeed(1); // User 1’s news feed should return a list with 1 tweet id -> [5], since user 1 is no longer following user 2.

Key Ideas:

The tricky part of the problem was the use of multiple data structures for the problem.

  • Use a hashmap to store the tweets tweeted by each userId
  • Use a hashmap to store the users who are following each userId
  • when a userId posts a tweet, add the tweet as a tuple to the hashmap with a negative priority value.
  • To generate a feed, collate all the tweets from the users who are following the userId
    • Build a heap from the list of tweets; which takes O(n)
    • Pop the top 10 elements from the heap and return the list of tweet ids; which takes O(10*logn) $\approx$ O(logn)
Code


import heapq
from collections import defaultdict

class Twitter:

    def __init__(self):
        self.usersConn = defaultdict(list)
        self.usersTweets = defaultdict(list)

        self.tweet_pr = 0

    def postTweet(self, userId: int, tweetId: int) -> None:
        self.tweet_pr -= 1
        self.usersTweets[userId].append((self.tweet_pr, tweetId))

    def getNewsFeed(self, userId: int) -> List[int]:

        if userId not in self.usersTweets and userId not in self.usersConn:
            return []

        feed = []
        tweets = self.usersTweets[userId].copy()
        for friend in self.usersConn[userId]:
            tweets.extend(self.usersTweets[friend])

        heapq.heapify(tweets)
        while len(feed) < 10 and tweets:
            obj = heapq.heappop(tweets)
            _, tweetId = obj
            feed.append(tweetId)

        return feed

    def follow(self, followerId: int, followeeId: int) -> None:
        if followeeId not in self.usersConn[followerId]:
            self.usersConn[followerId].append(followeeId) # O(1)

    def unfollow(self, followerId: int, followeeId: int) -> None:
        if followeeId in self.usersConn[followerId]:
            self.usersConn[followerId].remove(followeeId) # O(n)

621. Task Scheduler

Given a characters array tasks, representing the tasks a CPU needs to do, where each letter represents a different task. Tasks could be done in any order. Each task is done in one unit of time. For each unit of time, the CPU could complete either one task or just be idle.

However, there is a non-negative integer n that represents the cooldown period between two same tasks (the same letter in the array), that is that there must be at least n units of time between any two same tasks.

Return the least number of units of times that the CPU will take to finish all the given tasks.

Example:

Input: tasks = [“A”,“A”,“A”,“B”,“B”,“B”], n = 2
Output: 8
Explanation:
A -> B -> idle -> A -> B -> idle -> A -> B
There is at least 2 units of time between any two same tasks.

Key Ideas:

  • use a max-heap to store the tasks’s counter
  • have a while loop to simulate time
    • pop from the heap, and reduce the counter
      • add to the queue with the time at which it can be popped, with the counter
  • if the first element in the queue’s time is same as current time
    • pop the element and add it to the heap if the counter is non-zero positive
  • update the time
Code


from collections import deque, Counter
import heapq

class Solution:
    def leastInterval(self, tasks: List[str], n: int) -> int:

        time = 0
        queue = deque([])

        max_heap = [-v for _, v in Counter(tasks).items()]
        heapq.heapify(max_heap)

        while queue or max_heap:

            if max_heap:
                val = heapq.heappop(max_heap)
                val += 1

                if val < 0:
                    queue.append((time+n, val))

            if queue and queue[0][0] == time:
                _, val = queue.popleft()
                heapq.heappush(max_heap, val)

            time += 1

        return time

295. Find Median from Data Stream

Pattern: When it is required to find the median of a list, where the list grows continuously.

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value and the median is the mean of the two middle values.

  • For example, for arr = [2,3,4], the median is 3.
  • For example, for arr = [2,3], the median is (2 + 3) / 2 = 2.5.

Implement the MedianFinder class:

  • MedianFinder() initializes the MedianFinder object.
  • void addNum(int num) adds the integer num from the data stream to the data structure.
  • double findMedian() returns the median of all elements so far. Answers within 10^-5 of the actual answer will be accepted.

Example:

Input:
[“MedianFinder”, “addNum”, “addNum”, “findMedian”, “addNum”, “findMedian”]
[[], [1], [2], [], [3], []]
Output:
[null, null, null, 1.5, null, 2.0]

Explanation

MedianFinder medianFinder = new MedianFinder();
medianFinder.addNum(1); // arr = [1]
medianFinder.addNum(2); // arr = [1, 2]
medianFinder.findMedian(); // return 1.5 (i.e., (1 + 2) / 2)
medianFinder.addNum(3); // arr[1, 2, 3]
medianFinder.findMedian(); // return 2.0

Method 1:

Key Ideas:

  • use an array to store the stream of numbers
  • appending to the array is O(1)
  • To find the median, sort the array and compute the median index based on the length of the array; O(nlogn)
Code


import heapq

class MedianFinder:

    def __init__(self):
        self.data_stream = []

    def addNum(self, num: int) -> None:
        self.data_stream.append(num) # O(1)

    def findMedian(self) -> float:

        m = len(self.data_stream)
        self.data_stream.sort() # O(nlogn)

        idx = m//2
        if m % 2 != 0:
            median = self.data_stream[idx]
        else:
            median = (self.data_stream[idx] + self.data_stream[idx-1])/2

        return median

Method 2:

Key Ideas:

  • Maintain two heaps to store the lower and upper halves of the stream of numbers
    • The lower half consists of small numbers of data stream; max-heap
    • The upper half consists of large numbers of data stream; min-heap
  • By using two heaps, we have a slight tradeoff in the time complexity of addNum to O(logn) instead of O(1)
    • By default, we always add a new number to the small number heap
    • After adding the number to the small heap, we can check for certain properties to see if we need to balance the heaps
      • Property 1: Difference between the sizes of the two heaps should be at most 1
        • If it is not satisfied, then pop and push the number to the other heap
      • Property 2: The root of the min-heap should be larger than the root of the max-heap
        • If it is not satisfied
          • pop the root of the max-heap and push it to the min-heap
          • pop the root of the min-heap and push it to the max-heap
Maintain two heaps to optimize `findMedian()`
Maintain two heaps to optimize findMedian()
  • As we have two heaps to store the stream of numbers, the time complexity of findMedian is O(1)
    • Why? Because we know that the median can be computed from the roots of two heaps, as atmost one heap will have one more element than the other.
Code


import heapq

class MedianFinder:

    def __init__(self):

        self.sn = [] # heap for small numbers
        self.ln = [] # heap for large numbers

    def addNum(self, num: int) -> None:

        # 1. Add it to sn and check if all the properties are satisfied
        # Prop 1: Difference in the sn and ln should at most 1
        # Prop 2: Max value in the sn should be less than min value in the ln

        heapq.heappush(self.sn, -num)

        if len(self.sn) > 1 + len(self.ln):
            val = -1*heapq.heappop(self.sn)
            heapq.heappush(self.ln, val)

        if len(self.ln) > 1 + len(self.sn):
            val = heapq.heappop(self.ln)
            heapq.heappush(self.sn, -val)

        if self.sn and self.ln and self.sn[0] < self.ln[0]:
            val = -1*heapq.heappop(self.sn)
            heapq.heappush(self.ln, val)

            val = heapq.heappop(self.ln)
            heapq.heappush(self.sn, -val)

    def findMedian(self) -> float:

        if len(self.sn) > len(self.ln):
            # there are odd number of elements
            return -1 * self.sn[0]
        elif len(self.ln) > len(self.sn):
            return self.ln[0]

        v1 = self.ln[0]
        v2 = -1*self.sn[0]

        return (v1+v2)/2

References


  1. https://realpython.com/python-heapq-module/ ↩︎

Previous
Next