-
Notifications
You must be signed in to change notification settings - Fork 2
/
uv-utils.lua
172 lines (148 loc) · 4.29 KB
/
uv-utils.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
--- Utilities for working with libuv.
---
--- sleep: Sleeps for a number of milliseconds.
--- StreamReader: An async wrapper for a readable uv_stream_t.
--- StreamWriter: An async wrapper for a writable uv_stream_t.
local M = {}
--- Sleeps for a number of milliseconds.
---
---@async
---@param ms number The number of milliseconds to sleep.
M.sleep = function(ms)
local uv = require("coop.uv")
local copcall = require("coop.coroutine-utils").copcall
local timer = vim.uv.new_timer()
local success, err = copcall(uv.timer_start, timer, ms, 0)
-- Safely close resources even in case of a cancellation error.
timer:stop()
timer:close()
if not success then
error(err, 0)
end
end
--- A stream reader is an async wrapper for a readable uv_stream_t.
---
--- This is useful for a subprocess’s stdout and stderr..
---
---@class StreamReader
---@field handle uv_stream_t
---@field buffer MpscQueue
---@field at_eof boolean
---@field read async fun(StreamReader): string?
---@field read_until_eof async fun(StreamReader): string
---@field close async fun(StreamReader)
M.StreamReader = {}
--- Creates a stream reader.
---
---@param handle uv_stream_t The handle to a readable stream.
---@return StreamReader stream_reader The stream writer object.
M.StreamReader.new = function(handle)
if not vim.uv.is_readable(handle) then
error("Can not create a stream reader, because the handle is not readable.")
end
local MpscQueue = require("coop.mpsc-queue").MpscQueue
local stream_reader = { handle = handle, buffer = MpscQueue.new(), at_eof = false }
vim.uv.read_start(handle, function(err, data)
if not err then
stream_reader.buffer:push(data)
end
end)
return setmetatable(stream_reader, { __index = M.StreamReader })
end
--- Creates a stream reader from a file descriptor.
---
---@param fd integer The file descriptor.
---@return StreamReader stream_reader The stream reader object.
M.StreamReader.from_fd = function(fd)
local handle = vim.uv.new_pipe()
handle:open(fd)
return M.StreamReader.new(handle)
end
--- Reads data from the stream.
---
---@async
---@param self StreamReader
---@return string? data The data read from the stream or nil if the stream is at EOF.
M.StreamReader.read = function(self)
if self.at_eof then
return nil
end
local data = self.buffer:pop()
if data == nil then
self.at_eof = true
end
return data
end
--- Reads remaining data from the stream.
---
---@async
---@param self StreamReader
---@return string data The data read from the stream.
M.StreamReader.read_until_eof = function(self)
local data = {}
local chunk = self:read()
local i = 1
while chunk ~= nil do
data[i] = chunk
i = i + 1
chunk = self:read()
end
return table.concat(data, "")
end
--- Closes the stream reader.
---
---@async
---@param self StreamReader
M.StreamReader.close = function(self)
return require("coop.uv").close(self.handle)
end
--- A stream writer is an async wrapper for a writable uv_stream_t.
---
--- This is useful for a subprocess’s stdin.
---
---@class StreamWriter
---@field handle uv_stream_t
---@field write async fun(StreamWriter, string)
---@field close async fun(StreamWriter)
M.StreamWriter = {}
--- Creates a stream writer.
---
---@param handle uv_stream_t The handle to a writable stream.
---@return StreamWriter stream_writer The stream writer object.
M.StreamWriter.new = function(handle)
if not vim.uv.is_writable(handle) then
error("Can not create a stream writer, because the handle is not writable.")
end
local stream_writer = { handle = handle }
local meta_stream_writer = {
__index = M.StreamWriter,
}
return setmetatable(stream_writer, meta_stream_writer)
end
--- Creates a stream writer from a file descriptor.
---
---@param fd integer The file descriptor.
---@return StreamWriter stream_writer The stream writer object.
M.StreamWriter.from_fd = function(fd)
local handle = vim.uv.new_pipe()
handle:open(fd)
return M.StreamWriter.new(handle)
end
--- Writes data to the stream.
---
---@async
---@param self StreamWriter
---@param data string The data to write.
---@return string? err
---@return string? err_name
M.StreamWriter.write = function(self, data)
return require("coop.uv").write(self.handle, data)
end
--- Closes the stream writer.
---
---@async
---@param self StreamWriter
M.StreamWriter.close = function(self)
return require("coop.uv").close(self.handle)
end
return M