Стек и очередь

Рассмотрим следующий интерфейс:

empty()  ## выход: контейнер
put(c,x) ## входы: контейнер, что угодно
take(c)  ## вход: контейнер, выход: что-то
size(c)  ## вход: контейнер, выход: натуральное число

со следующим набором свойств:

empty() не имеет побочных эффектов
# это означает, что если в алгоритм добавить произвольное количество 
# подобных вызовов, результат работы алгоритма не изменится

size(empty()) == 0

size(c) не имеет побочных эффектов

# если предварительно сделать a = size(c); put(c,x), то
size(c) == a+1

# если size(c) > 0 и предварительно сделать a = size(c); take(c), то
size(c) == a-1

# для произвольного массива a, его копии b и пустого массива r,
# при начальном условии c = empty()
# и произвольной сбалансированной последовательности операций
# r.append(take(c)) и put(c, a.pop())
sorted(b) == sorted(r)

В последнем свойстве под сбалансированностью понимается, что перед любым take выполнено size(c) > 0, а после последней операции выполнено size(c) == 0.

Будем называть вышеописанный абстрактный тип данных мешком.

Внимательный читатель заметит, что функция sorted не работает для произвольного массива: далеко не всё, что угодно, можно сравнивать между собой. В связи с этим договоримся, что мы рассматриваем «идеализированную» версию sorted.

Стек и очередь

Если для любого массива a и любого подходящего c программа

b = []
for x in a: put(c,x)
for _ in range(len(a)): b.append(take(c))

приводит к тому, что a == b, то соответствующий мешок называется очередью (или FIFO: First In First Out).

Если же для любого a такая программа приводит к a == list(reversed(b)), то соответствующий мешок называется стеком (или LIFO: Last In First Out).

Для стека более традиционны названия операций push (вместо put) и pop (вместо take).

Иммутабельные версии

Вообще говоря, все вышеописанные свойства всё равно не слишком хорошо описывают стек/очередь/мешок. Например, следующая версия «стека» удовлетворяет всем вышеперечисленным свойствам:

STACKS = []
def empty(): n = len(STACKS); STACKS.append((n,[])); return STACKS[-1]
def size(c): return len(c[1])
def put(c,x): c[1].append(x)
def take(c): 
  for i, s in STACKS: if i != c[0]: s.clear()
  return c[1].pop()

но при этом обладает весьма неожиданным поведением (и сомнительной применимостью).

Поэтому вместо того, чтобы аккуратно определять мутабельность (это очень сложно), дадим иммутабельную версию стека/очереди/мешка.

# интерфейс
i_empty()
i_size(c)
i_put(c,x)
i_take(c)

# свойства
все функции не имеют побочных эффектов

i_size(i_empty()) == 0

size(i_put(c,x)) == size(c) + 1

# если size(c) > 0, то
size(i_take(c)[0]) == size(c) - 1

# для произвольного массива a, его копии b и пустого массива r,
# при начальном условии c = i_empty()
# и произвольной сбалансированной последовательности операций
# c, x = i_take(c); r.append(x) 
# и 
# c = i_put(c, a.pop())
sorted(b) == sorted(r)

# плюс для стека/очереди нужно ещё одно свойство 
# (аналогичное таковому для мутабельных версий)

А для мутабельной просто потребуем, чтобы она своим поведением была эквивалента следующей:

def empty(): return [i_empty()]
def size(c): return i_size(c[0])
def put(c,x): c[0] = i_put(c[0], x)
def take(c): c[0], x = i_take(c[0]); return x

Реализации стека

Будем использовать классические названия push и pop.

Массив

def empty():   return []
def size(c):   return len(c)
def push(c,x): c.append(x)
def pop(c):    return c.pop()

Связный список

def i_empty():   return (0, empty_list())
def i_size(c):   return c[0]
def i_push(c,x): return (c[0]+1, join(x, c[1]))
def i_pop(c):    return (c[0]-1, tail(c[1])), head(c[1])

Классические применения стека

Вычисление формул

Рассмотрим следующую функцию:

def evaluate_rec(formula):
  if type(formula) == tuple:
    op = formula[0]
    l = evaluate_rec(formula[1])
    r = evaluate_rec(formula[2])

    if op == '+': return l+r
    if op == '-': return l-r
    if op == '*': return l*r

  return formula

Она вычисляет значения формул вида ('+', 1, ('*', ('-', 2, 3) ('+', 4, 5))) (более традиционно такая формула записывается \(1+(2-3)\cdot(4+5)\)).

Стеки позволяют записать алгоритм вычисления такой формулы без использования рекурсии:

def evaluate(formula):
  formula_stack = empty()
  operator_stack = empty()

  push(formula_stack, formula)
  while True:
    top = pop(formula_stack)

    if type(top) == tuple:
      push(operator_stack, top[0])
      push(formula_stack, top[2])
      push(formula_stack, top[1])
    else:
      if size(operator_stack) == 0: return top
      op = pop(operator_stack)

      if type(op) == str:
        push(operator_stack, (op, top))
      else:
        if op[0] == '+': value = op[1] + top
        if op[0] == '-': value = op[1] - top
        if op[0] == '*': value = op[1] * top
        push(formula_stack, value)

Этот алгоритм работает следующим образом: есть стек формул и стек операций. Первый изначально содержит формулу, которую требуется вычислить, второй — пустой.

На каждом шагу достаётся элемент из стека формул.

  • Если это (неатомарная) формула, то она разбирается на операцию и подформулы: операция переносится в стек операций, подформулы кладутся обратно в стек формул.
  • Если же это число, а стек операций пустой, то это число — ответ.
  • Если стек операций не пуст, оттуда достаётся операция, в которую подставляется наше число. Результат подстановки кладётся в стек операций или же в стек формул в зависимости от того, является ли он числом или же недовычисленной операцией.

В данном случае мы кодируем частично вычисленные операции парами вида (название операции, её первый вход).

Если рассматривать процедуру вычисления формулы как обход соответствующего графа в глубину, то стек формул — это текущий фронт, а стек операций — текущий путь от начала.

Ещё один способ вычисления формул

Единственной сложностью предыдущего алгоритма была необходимость как-то кодировать частично вычисленные операции. Сейчас мы приведём альтернативный (также двухстековый) алгоритм, в некотором смысле двойстенный предыдущему:

def evaluate(formula):
  value_stack = empty()
  command_stack = empty()

  push(command_stack, formula)
  while size(command_stack) > 0:
    top = pop(command_stack)

    if type(top) == tuple:
      push(command_stack, top[0])
      push(command_stack, top[1])
      push(command_stack, top[2])
    elif type(top) == str:
      a = pop(value_stack)
      b = pop(value_stack)
      if top == '+': value = a+b
      if top == '-': value = a-b
      if top == '*': value = a*b
      push(value_stack, value)
    else:
      push(value_stack, top)

  return pop(value_stack)

В этом алгоритме мы имеем дело с преобразователями стека, которые в рамках этого алгоритма мы будем называть командами. Каждая команда каким-то образом меняет стек значений.

В частности, формулы мы отождествляем с командами, кладущими в стек значений своё значение. А операции — с командами, забирающими из стека значений свои входы и кладущими туда свой результат (на этих входах).

Нетрудно видеть, что команда, соответствующая формуле (op, a, b), эквивалентна последовательности команд b, a, op. Собственно, именно эта эквивалентность и используется алгоритмом для вычисления значения формулы.

Реализация механизма вызова функций

Попробуйте запустить следующий код:

def foo():
  a = 3
  a += bar(2, a)
  return a

def bar(x, y):
  b = (5+x)*y
  b += baz(b)
  return b

def baz(x):
  return x/0

foo()

Вы получите сообщение об ошибке вроде следующего

Traceback (most recent call last):
  File "??????.py", line 14, in <module>
    foo()
  File "??????.py", line 3, in foo
    a += bar(2, a)
  File "??????.py", line 8, in bar
    b += baz(b)
  File "??????.py", line 12, in baz
    return x/0
ZeroDivisionError: division by zero

Это — текущее содержимое стека вызовов на момент ошибки.

Стек вызовов — стек, в который операции вызова функций записывают «адрес возврата» — место программы (и текущее состояние исполнителя), куда нужно вернуться по завершении работы соответствующей функции.

В качестве иллюстрации покажем пример простейшего интерпретатора языка с присваиваниями и вызовами функций. В качестве программы этот язык будет принимать питоновский список «команд».

Например, аналогом вышеприведённой программы будет следующее:

[
  ("call", 2),             ## строчка 0
  ("stop",),               ## строчка 1
  ("set", "a", 3),         ## строчка 2: def foo()
  ("call", 6, 2, "a"),     ## строчка 3
  ("add", "a", "_result"), ## строчка 4
  ("return", "a"),         ## строчка 5
  ("set", "b", 5),         ## строчка 6: def bar(x, y)
  ("add", "b", "_0"),      ## строчка 7
  ("mul", "b", "_1"),      ## строчка 8
  ("call", 12, "b"),       ## строчка 9
  ("add", "b", "_result"), ## строчка 10
  ("return", "b"),         ## строчка 11
  ("set", "temp", "_0"),   ## строчка 12: def baz(x)
  ("div", "temp", 0),      ## строчка 13
  ("return", "temp"),      ## строчка 14
]

Сам интерпретатор может быть устроен так:

def interpret(program):
  stack     = []
  variables = {}
  line      = 0

  while 0 <= line < len(program):
    command = program[line]
    op = command[0]

    if op in COMMANDS:
      line = COMMANDS[op](stack, variables, line, command[1:])
    else:
      raise RuntimeError("Unknown command: " + op)

def evaluate(variables, literal):
  if literal in variables: return variables[literal]
  return literal

def command_set(stack, variables, line, args):
  variables[args[0]] = evaluate(variables, args[1])
  return line + 1

def command_add(stack, variables, line, args):
  variables[args[0]] += evaluate(variables, args[1])
  return line + 1

def command_mul(stack, variables, line, args):
  variables[args[0]] *= evaluate(variables, args[1])
  return line + 1

def command_div(stack, variables, line, args):
  variables[args[0]] /= evaluate(variables, args[1])
  return line + 1

def command_stop(stack, variables, line, args):
  return -1 

def command_call(stack, variables, line, args):
  old_variables = { v:variables[v] for v in variables }
  
  stack.append( (old_variables, line) )
  variables.clear()

  for i, arg in enumerate(args[1:]):
    variables["_"+str(i)] = evaluate(old_variables, arg)

  return args[0]

def command_return(stack, variables, line, args):
  result = None if len(args) < 1 else evaluate(variables, args[0])

  old_variables, line = stack.pop()
  variables.clear()
  
  for v in old_variables:
    variables[v] = old_variables[v]

  variables["_result"] = result

  return line+1


COMMANDS = {
  "call":   command_call,
  "stop":   command_stop,
  "set":    command_set,
  "add":    command_add,
  "mul":    command_mul,
  "div":    command_div,
  "return": command_return,
}

Обход «в глубину»

Предположим, что мы имеем дело со связным (ориентированным) графом, который нужно полностью обойти (формально говоря, перечислить все его вершины).

Стандартный способ обохода описан в древнегреческих мифах:

def traverse(graph, start, callback = None):
  path    = [start]
  visited = set()
  
  while path:
    current_place = path[-1]
    visited.add(current_place)

    ## можно что-нибудь сделать в вершине, если нужно
    if callback: callback(current_place)

    ## функция neighbors выдаёт список соседей указанной вершины
    to_visit = [n for n in neigbors(graph,current_place)
                  if  n not in visited]
    
    if to_visit: path.append(to_visit[0])
    else:        path.pop()

  return visited

Нетрудно увидеть здесь стек:

def traverse(graph, start, callback = None):
  path = empty()
  push(path, start)

  visited = set()
  
  while path:
    current_place = pop(path)
    push(path, current_place)
    visited.add(current_place)

    if callback: callback(current_place)

    to_visit = [n for n in neigbors(graph, current_place)
                  if  n not in visited]
    
    if to_visit: push(path,to_visit[0])
    else:        pop(path)

  return visited

Теперь перейдём к более современной версии того же обхода (к сожалению, автор не в курсе, знали ли её древние греки), основанную на другой идеологии, которая имеет полезные обобщения.

Фронтовой обход

Будем действовать следующим методом: на каждом шаге храним фронт — множество соседей тех вершин, в которых мы уже были. Если фронт непуст, забираем из него любую вершину, а вместо неё добавляем всех её непосещённых соседей. Этот алгоритм выражается так:

def traverse(graph, start, callback = None):
  front = empty()
  put(front, start)

  visited = set()

  while size(front) > 0:
    vertex = take(front)
    if vertex in visited: continue
    visited.add(vertex)
    
    if callback: callback(vertex)
    
    for n in neighbors(graph, vertex): put(front, n)

  return visited

Этот алгоритм, как можно видеть, выражается в терминах операций над произвольным мешком. От выбора реализации мешка зависит порядок обхода.

Обход «в глубину» получается, если мешок является стеком. Оставим доказательство этого утверждения в качестве простого упражнения на метод математической индукции.

Реализации очереди

Массив

Наиболее простая реализация очереди может выглядеть так:

def empty():   return []
def size(c):   return len(c)
def put(c,x):  c.append(x)
def take(c):   return c.pop(0)

Эта реализация плоха тем, что операция «забрать элемент» линейна по размеру очереди.

Циклическая очередь

При сколько-нибудь больших размерах очереди более эффективна следующая реализация:

MAX_CAPACITY = 100

def empty():  return [0, 0, [None] * MAX_CAPACITY]
def size(c):  return (c[1] - c[0]) % len(c[2])

def put(c,x):
  _, right, array = c

  if size(c) == len(array) - 1:
    raise RuntimeError("Max capacity exceeded")
  
  array[right] = x
  c[1] = (right + 1) % len(array)
  

def take(c):
  left, _, array = c

  if size(c) == 0:
    raise RuntimeError("Queue is empty")

  c[0] = (left + 1) % len(array)
  return array[left]

Расширяемая циклическая очередь

Как нетрудно видеть, предыдущая реалзиация имеет ограниченную вместимость. Это можно легко исправить альтернативным put:

def put(c,x):
  _, right, array = c

  array[right] = x
  c[1] = (right + 1) % len(array)

  if size(c) < len(array) - 1: return

  new_queue = [0, 0, [None] * (len(array)*2)]

  while size(c) > 0: put(new_queue, take(c))

  for i, _ in enumerate(c): c[i] = new_queue[i]

Классические применения очереди

Запаздывание в однопроходных алгоритмах

Рассмотрим типичную задачу из ЕГЭ: найти максимальное произведение пары элементов входного массива (все элементы — целые положительные числа), порядковые номера которых различаются хотя бы на 6, делящееся на 15.

Вот — каноническое её решение:

def process(sequence):
  queue = empty()
  maxima = [0] * 15
  answer = 0

  for x in sequence:
    put(queue, x)
    if size(queue) <= 6: continue

    y = take(queue)
    mod = y % 15
    maxima[mod] = max(maxima[mod], y)

    candidates = [ x * maxima[m]
                   for m in range(15)
                   if (m * x) % 15 == 0 ]

    if len(candidates) > 0: answer = max(answer, max(candidates))


  return answer

Общий вид этого алгоритма следующий:

def process(sequence, delay, init_state, update_a, update_b):
  queue = empty()
  a, b = init_state()

  for x in sequence:
    put(queue, x)
    if size(queue) <= delay: continue

    a = update_a(a, take(queue))
    b = update_b(a, b, x)
  
  return b

Это — разновидность редукции списка, в которой состояние делится на две компоненты, одна из которых обновляется с отставанием от другой.

По-научному такая редукция (а точнее, некоторое её обобщение) называется зигохистоморфизмом списка. Обычная же редукция называется катаморфизмом списка, а полуторакомпонентная редукция с отставанием, равным 0, называется зигоморфизмом списка.

Если быть более точным, то классические ката-, зиго- и прочие …морфизмы относятся к «правым» свёрткам: они обрабатывают список с конца. Впрочем, левая и правая свёртки очень тесно связаны: одну можно получить из другой, например, разворотом списка.

Обход «в ширину»

Недостатком обхода в глубину является чрезвычайный оппортунизм, следствием которого является, например, невозможность (потенциально) полного обхода бесконечных графов. Поэтому часто используется обход в ширину, перебирающий вершины в порядке возрастания расстояния от них до начальной.

Чтобы получить обход в ширину, достаточно в вышеприведённом фронтовом обходе взять в качестве мешка очередь.