Очередь с приоритетом

Напомним, что очередь с приоритетом относится к структурам данных, грубо говоря, типа «ящик»: в неё можно класть что-нибудь (по собственному выбору) и забирать что-нибудь (без возможности выбрать, что именно). Структуры такого плана возникали в нашем курсе ранее, во время обсуждения разнообразных обходов графов. Конкретно очередь с приоритетом, например, лежит в основе алгоритма Дейкстры.

Этот раздел весьма трудоёмок для чтения: в нём кода больше, чем русского языка, а код этот не слишком простой. В тот момент, когда станет категорически непонятно, что именно происходит, рекомендуется перейти к следующему разделу.

Наивная реализация

Очередь с приоритетом работает так:

class pqueue:
    def __init__(self):
        self.data = []
    
    ## этот метод вызывается стандартной функцией len
    def __len__(self):  
        return len(self.data)

    def put(self, priority, datum):
        self.data.append( (priority, datum) )

    def take(self):
        min_priority = min([x[0] for x in self.data])
        
        for i, x in enumerate(self.data):
            if x[0] == min_priority:
                self.data = self.data[:i]+self.data[i+1:]
                return x

Эта реализация хороша в качестве описания того, как именно работает очередь с приоритетом. Но она обычно плохо себя ведёт на практике: взятие из неё одного элемента занимает время, пропорциональное количеству хранимых элементов.

Одноразовая очередь

Будем говорить, что очередь (с приоритетом) является одноразовой, если она не поддерживает операцию добавления элемента (а вместо этого позволяет добавить все необходимые элементы при создании).

Самой простой реализацией такой очереди является просто упорядоченный массив:

class array_pqueue1:
    def __init__(self, data):
        self.data = sorted([(p,v) for p,v in data],
                           key = lambda x: -x[0]) ## в обратном порядке
    
    def __len__(self):  
        return len(self.data)

    def take(self):
        return self.data.pop()

Альтернативной реализацией является сбалансированное дерево минимумов. Под словом «сбалансированное» понимается логарифмическая зависимость между количеством элементов и высотой дерева, по ним построенного.

Обратите внимание, что нижеприведённый код использует конкретную модель дерева (а именно, S-выражения). Это связано с тем, что катаморфное описание процедуры извлечения элемента из мультимножества требует дополнительных технических деталей, существенно затрудняющих изложение.

def tree_priority(tree):
    ## (p,v) -> p
    ## [p, t1, t2, ... tn] -> p
    return tree[0]

def min_branch(trees):
    return [min(map(tree_priority,trees))] + trees

class tree_pqueue1:
    def __init__(self, data, branching_factor=8):
        trees = [(p,v) for p,v in data]
        self.len = len(trees)
        self.bf = branching_factor  ## self.bf пока не используется
        
        if len(trees) == 0:
            self.tree = None
            return

        while len(trees) > 1:
            new_trees = []
            
            for i in range(0,len(trees),branching_factor):
                to_wrap = trees[i:i+branching_factor]
                new_trees.append(min_branch(to_wrap))

            trees = new_trees

        self.tree = trees[0]

    def __len__(self):
        return self.len

    def take(self):
        ## вспомогательный рекуррентный обход, возвращающий
        ## дерево без удалённого элемента и этот элемент
        def extract(tree, priority):
            if type(tree) != list: return None, tree

            new_trees = []
            result = None
            extracted = False  ## именно из-за этой мутабельной переменной
                               ## катаморфное описание несколько затруднено
            for t in tree[1:]:
                if t == None: continue
                
                if tree_priority(t) == priority and not extracted:
                    new_tree, result = extract(t, priority)
                    extracted = True
                    if new_tree != None: new_trees.append(new_tree)
                else:
                    new_trees.append(t)

            if len(new_trees) == 0: return None, result
            return min_branch(new_trees), result

        if self.tree == None: raise RuntimeError("queue is empty")
        
        p = self.tree[0]
        self.tree, result = extract(self.tree, p)
        self.len -= 1

        return result

Динамизированный упорядоченный массив

Обе вышеописанные структуры можно динамизировать, чтобы получить эффективное добавление элементов. Но на практике лучше динамизировать упорядоченный массив — он банально проще и эффективнее.

def rev_sorted(xs):
    return sorted(xs, key=lambda x: -x[0])

class array_pqueue:
    def __init__(self):
        self.clean()

    def __len__(self):
        return self.inserted_count - self.deleted_count

    def clean(self):
        self.levels = {}
        self.inserted_count = 0
        self.deleted_count = 0

    def reconstruct(self):
        if self.inserted_count == self.deleted_count:
            return self.clean()

        elements = rev_sorted([x for l in self.levels 
                                 for x in self.levels[l]])
        level = 0
        good_size = 1
        
        while good_size < len(elements):
            level += 1
            good_size *= 2

        self.levels = { level: elements }
        self.inserted_count = len(elements)
        self.deleted_count = 0

    def put(self, priority, datum):
        new = [(priority, datum)]
        level = 0
    
        while level in self.levels:
            old = self.levels.pop(level)
            new = rev_sorted(old + new)
            level += 1
    
        self.levels[level] = new
        self.inserted_count += 1

    def take(self):
        best_level = None
        best_priority = None
        for level in self.levels:
            priority = self.levels[level][-1][0]
            if best_level == None or best_priority > priority:
                best_level = level
                best_priority = priority

        if best_level == None: raise RuntimeError("queue is empty")
        
        elements = self.levels[best_level]
        result = elements.pop()

        if len(elements) == 0: del self.levels[best_level]
        
        self.deleted_count += 1
        if self.deleted_count*2 >= self.inserted_count:
            self.reconstruct()

        return result

А вот с деревом можно произвести более интересную модификацию.

Самобалансирующееся B-дерево

Дерево называют самобалансирующимся, если оно поддерживает операцию добавления новых данных с сохранением сбалансированности (засчёт изменения своей структуры). Исторически первой разновидностью самобалансирующихся деревьев считаются деревья Адельсон-Вельского и Ландиса. На настоящий момент (2018 год) существенно более популярны разновидности т.н. B-деревьев.

B-дерево отличается от вышеописанного одноразового дерева, грубо говоря, следующим:

  • каждая ветка одноразового дерева содержит не более branching_factor поддеревьев
  • B-дерево допускает менее чем двукратное превышение этого количества; при двукратном превышении дерево «разрывается» на две части

Одноразовое дерево можно преобразовать в B-дерево, просто добавив к нему метод вставки нового элемента:

def put(self, priority, datum):
    def insert(tree, priority, datum):
        if type(tree) != list: return [tree, (priority, datum)]
        
        trees = tree[1:-1] + insert(tree[-1], priority, datum)
        if len(trees) >= 2*self.bf:
            part1 = trees[:self.bf]
            part2 = trees[self.bf:]
            return [ min_branch(part1), min_branch(part2) ]

        return [ min_branch(trees) ]
    
    if self.tree == None:
        self.tree = [priority, (priority, datum)]
    else:
        roots = insert(self.tree, priority, datum)
        if len(roots) == 1: self.tree = roots[0]
        else: self.tree = min_branch(roots)

    self.len += 1

Классическая реализация (т.н. бинарная куча)

Говоря про очередь с приоритетом, нельзя не упомянуть классическую реализацию этой структуры данных, в которой тоже присутствует дерево, но неявным образом.

Все данные хранятся в массиве, обладающим следующим свойством: для всех i элемент на i-м месте не больше (по приоритету) элементов на местах (2*i+1) и (2*i+2) (если таковые места есть).

def swap(array, i, j):
    array[i], array[j] = array[j], array[i]

class heap_pqueue:
    def __init__(self):
        self.data = []
    
    def __len__(self):
        return len(self.data)
    
    def put(self, priority, datum):
        i = len(self.data)

        ## новый элемент добавляется в конец, а затем прогоняется к началу
        self.data.append( (priority, datum) )

        while i > 0:
            parent = (i-1)//2
            if self.data[i][0] >= self.data[parent][0]: break

            swap(self.data, i, parent)
            i = parent

    def take(self):
        ## первый элемент обменивается с последним
        swap(self.data, 0, -1)
        result = self.data.pop()
        
        ## затем (бывший) последний элемент прогоняется к концу массива
        i = 0
        while True:
            a = 2*i+1
            b = 2*i+2
            if a >= len(self.data): return result

            if b < len(self.data) and self.data[b][0] < self.data[a][0]: a = b
            
            if self.data[i][0] <= self.data[a][0]: return result
            
            swap(self.data, i, a)
            i = a

Реализация очереди с приоритетом при помощи бинарной кучи по количеству кода короче почти любой другой эффективной реализации (за исключением, разве что, динамизированного массива без реконструкции). При этом она обладает неплохой производительностью, хотя и меньшей универсальностью: в отличие от бинарной кучи, как динамизированный массив, так и B-дерево (после незначительной модификации операции добавления элемента) можно использовать ещё и как словарь.

Об использовании дерева минимумов в качестве словаря (называемого в этом случае деревом поиска) будет следующий раздел, а сейчас скажем чуть-чуть о производительности всех вышеописанных подходов.

Сравнение производительности

Начнём с очень важной вещи: все тесты производительности — одна большая ложь. Реальная производительность зависит от огромного количества факторов, которые трудно исключить из сравнения. Приведённые ниже числа отражают лишь поведение конкретного кода на конкретной реализации конкретного языка программирования на конкретном компьютере.

Для других сочетаний код/язык/компилятор/компьютер относительная скорость работы алгоритмов может быть (и является) другой. Скажем лишь, что среди вышеприведённых трёх «эффективных» подходов нет однозначного лидера или же однозначного аутсайдера: для каждого из них есть обстоятельства, при которых он работает лучше других.

Теперь, собственно, опишем тесты, на которых измерялась производительность:

  1. добавление N элементов к пустой очереди
  2. удаление N элементов из заполненной очереди
  3. предварительное добавление 1000 элементов (не измерялось), а затем N-кратное добавление/удаление одного элемента

Все результаты указаны в микросекундах в пересчёте на один элемент. В каждом из тестов добавлялись элементы со случайными приоритетами.

Сначала результаты для добавления N элементов.

|алгоритм|мкс/эл N=100|мкс/эл N=1000|мкс/эл N=10000|мкс/эл N=100000|мкс/эл N=1000000| |-|-|-|-|-| |наивный|1|1|1|-|-| |массив|8|8|10|12|14| |дерево|14|22|30|38|46| |куча|3|3|3|3|3|

Для удаления N элементов.

|алгоритм|мкс/эл N=100|мкс/эл N=1000|мкс/эл N=10000|мкс/эл N=100000|мкс/эл N=1000000| |-|-|-|-|-| |наивный|15|102|987|-|-| |массив|3|3|3|4|4| |дерево|21|34|45|58|79| |куча|8|14|21|28|37|

Для продолжительной работы.

|алгоритм|мкс/эл N=100|мкс/эл N=1000|мкс/эл N=10000|мкс/эл N=100000|мкс/эл N=1000000| |-|-|-|-|-| |наивный|147|150|273|-|-| |массив|20|15|12|8|6| |дерево|59|57|73|79|83| |куча|19|19|24|25|26|