-
Notifications
You must be signed in to change notification settings - Fork 2
/
mpsc-queue.lua
93 lines (81 loc) · 2.02 KB
/
mpsc-queue.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
--- This module provides a multiple-producer single-consumer queue.
local M = {}
--- A multiple-producer single-consumer queue.
---
--- The queue uses a buffer and its push is always non-blocking.
--- The pop operation is blocking iff the queue is empty.
---
---@class MpscQueue
---@field waiting Task
---@field head? MpscQueueNode
---@field tail? MpscQueueNode
---@field push fun(self: MpscQueue, value: any)
---@field pop async fun(self: MpscQueue): any
---@field empty fun(self: MpscQueue): boolean
---@class MpscQueueNode
---@field value any
---@field next? MpscQueueNode
M.MpscQueue = {}
--- Creates a new multiple-producer single-consumer queue.
---
---@return MpscQueue
M.MpscQueue.new = function()
local mpsc_queue = { waiting = nil, head = nil, tail = nil }
return setmetatable(mpsc_queue, { __index = M.MpscQueue })
end
--- Pushes a value to the queue.
---
---@param self MpscQueue
---@param value any
M.MpscQueue.push = function(self, value)
if self.waiting then
local waiting = self.waiting
waiting:resume(value)
return
end
if self.head == nil then
self.head = { value = value, next = nil }
self.tail = self.head
else
self.tail.next = { value = value, next = nil }
self.tail = self.tail.next
end
end
--- Pops a value from the queue.
---
--- This method yields iff the queue is empty.
---
---@async
---@param self MpscQueue
---@return any value
M.MpscQueue.pop = function(self)
if self.head == nil then
if self.waiting ~= nil then
error("Some other task is already waiting for a value.")
end
local task = require("coop.task")
local this = task.running()
if this == nil then
error("Pop must be called within a task.")
end
self.waiting = this
local running, value = task.pyield()
self.waiting = nil
if running then
return value
else
error(value, 0)
end
end
local value = self.head.value
self.head = self.head.next
return value
end
--- Checks if the queue is empty.
---
---@param self MpscQueue
---@return boolean
M.MpscQueue.empty = function(self)
return self.head == nil
end
return M