Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use io_uring for files on linux #175

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ EpollEventDriver | yes | — | — | — | — | —
WinAPIEventDriver | — | yes | — | — | — | —
KqueueEventDriver | — | — | yes | yes¹ | — | —
LibasyncEventDriver | —¹| —¹| —¹| —¹| — | —
UringEventDriver | —¹| no | no | no | unknown | no

¹ planned, but not currenly implemented

Expand All @@ -48,20 +49,20 @@ The following compilers are tested and supported:
Driver development status
-------------------------

Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync
----------------------|--------|-------|---------|---------|----------
TCP Sockets | yes | yes | yes | yes | —
UDP Sockets | yes | yes | yes | yes | —
USDS | yes | yes | — | yes | —
DNS | yes | yes | yes | yes | —
Timers | yes | yes | yes | yes | —
Events | yes | yes | yes | yes | —
Unix Signals | yes² | yes | — | — | —
Files | yes | yes | yes | yes | —
UI Integration | yes¹ | yes¹ | yes | yes¹ | —
File watcher | yes² | yes | yes | yes² | —
Pipes | yes | yes | — | yes | —
Processes | yes | yes | — | yes | —
Feature \ EventDriver | Select | Epoll | WinAPI | Kqueue | Libasync | Uring
----------------------|--------|-------|---------|---------|----------|-------
TCP Sockets | yes | yes | yes | yes | — | —
UDP Sockets | yes | yes | yes | yes | — | —
USDS | yes | yes | — | yes | — | —
DNS | yes | yes | yes | yes | — | —
Timers | yes | yes | yes | yes | — | —
Events | yes | yes | yes | yes | — | —
Unix Signals | yes² | yes | — | — | — | —
Files | yes | yes | yes | yes | — | yes
UI Integration | yes¹ | yes¹ | yes | yes¹ | — | yes?
File watcher | yes² | yes | yes | yes² | — | —
Pipes | yes | yes | — | yes | — | —
Processes | yes | yes | — | yes | — | —

¹ Manually, by adopting the X11 display connection socket

Expand Down
7 changes: 6 additions & 1 deletion dub.sdl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name "eventcore"
description "Pro-actor based abstraction layer over operating system asynchronous I/O facilities."
license "MIT"
copyright "Copyright © 2016-2018 Sönke Ludwig"

dependency "during" version="~>0.2.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a new "uring" configuration at the bottom of the file. Is "during" a local package of yours? I don't see it on code.dlang.org.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I can't test this locally in the current state, my only comment right now is that the integration should be optional as a separate build configuration. Once that is done, everything builds, and the white space changes and Meson build file changes are in separate commits, we could also just document it as experimental and merge it to master, so that improvements can be made in smaller incremental steps.

Great! Than I will polish it up to be suitable for master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a new "uring" configuration at the bottom of the file. Is "during" a local package of yours? I don't see it on code.dlang.org.

It's https://github.com/tchaloupka/during from code.dlang.org

Copy link
Contributor Author

@Panke Panke Mar 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

white space changes

which one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm it's really just two lines in utils.d, nevermind then, I thought it was more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's https://github.com/tchaloupka/during from code.dlang.org

Interesting, the search is obviously broken w.r.t. package names. Accessing it manually works, but I get zero results for "during".

targetType "library"

libs "resolv" platform="linux"
Expand Down Expand Up @@ -50,6 +50,11 @@ configuration "libasync" {
versions "EventcoreLibasyncDriver"
}

configuration "uring" {
platforms "linux"
versions "EventcoreEpollDriver" "EventcoreEpollUsesUring"
}

configuration "generic" {
// Defines eventDriver as the generic EventDriver interface. Setup must be done manually.
}
5 changes: 4 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ project_version_suffix = ''
project_version = meson.project_version()
project_version_full = project_version + project_version_suffix

taggedalgebraic_dep = dependency('taggedalgebraic', version: ['>=0.10.12', '<0.12'])
taggedalgebraic_dep = dependency('taggedalgebraic',
version: ['>=0.10.12', '<0.12'],
fallback: ['taggedalgebraic', 'taggedalgebraic_source_dep'])

source_root = meson.source_root()
build_root = meson.build_root()
subdir('source/eventcore')
subdir('examples')
1 change: 1 addition & 0 deletions source/eventcore/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileOpenCallback = void delegate(FileFD, IOStatus);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias FileCloseCallback = void delegate(FileFD, CloseStatus);
alias PipeIOCallback = void delegate(PipeFD, IOStatus, size_t);
Expand Down
46 changes: 37 additions & 9 deletions source/eventcore/drivers/posix/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import eventcore.drivers.posix.sockets;
import eventcore.drivers.posix.watchers;
import eventcore.drivers.posix.processes;
import eventcore.drivers.posix.pipes;
import eventcore.drivers.posix.io_uring.io_uring : UringEventLoop, NoRing;
import eventcore.drivers.posix.io_uring.files : UringDriverFiles;
import eventcore.drivers.timer;
import eventcore.drivers.threadedfile;
import eventcore.internal.consumablequeue : ConsumableQueue;
Expand All @@ -37,12 +39,17 @@ private long currStdTime()
return Clock.currStdTime;
}

version(EventcoreEpollUsesUring) {
version (EventcoreEpollDriver) {}
else { static assert(false, "uring only supported together with epoll for now"); }
}

final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
@safe: /*@nogc:*/ nothrow:


private {
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver);
alias CoreDriver = PosixEventDriverCore!(Loop, TimerDriver, EventsDriver, ProcessDriver, UringCore);
alias EventsDriver = PosixEventDriverEvents!(Loop, SocketsDriver);
version (linux) alias SignalsDriver = SignalFDEventDriverSignals!Loop;
else alias SignalsDriver = DummyEventDriverSignals!Loop;
Expand All @@ -51,16 +58,20 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
version (Windows) alias DNSDriver = EventDriverDNS_GHBN!(EventsDriver, SignalsDriver);
else version (EventcoreUseGAIA) alias DNSDriver = EventDriverDNS_GAIA!(EventsDriver, SignalsDriver);
else alias DNSDriver = EventDriverDNS_GAI!(EventsDriver, SignalsDriver);
alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (EventcoreEpollUsesUring) alias FileDriver = UringDriverFiles;
else alias FileDriver = ThreadedFileEventDriver!EventsDriver;
version (Posix) alias PipeDriver = PosixEventDriverPipes!Loop;
else alias PipeDriver = DummyEventDriverPipes!Loop;
version (linux) alias WatcherDriver = InotifyEventDriverWatchers!EventsDriver;
else version (EventcoreCFRunLoopDriver) alias WatcherDriver = FSEventsEventDriverWatchers!EventsDriver;
else alias WatcherDriver = PollEventDriverWatchers!EventsDriver;
version (Posix) alias ProcessDriver = PosixEventDriverProcesses!Loop;
else alias ProcessDriver = DummyEventDriverProcesses!Loop;
version (EventcoreEpollUsesUring) alias UringCore = UringEventLoop;
else alias UringCore = NoRing;

Loop m_loop;
UringCore m_uring;
CoreDriver m_core;
EventsDriver m_events;
SignalsDriver m_signals;
Expand All @@ -76,15 +87,19 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
this()
@nogc @trusted {
m_loop = mallocT!Loop;
version (EventcoreEpollUsesUring) m_uring = mallocT!UringCore;
m_sockets = mallocT!SocketsDriver(m_loop);
m_events = mallocT!EventsDriver(m_loop, m_sockets);
m_signals = mallocT!SignalsDriver(m_loop);
m_timers = mallocT!TimerDriver;
m_pipes = mallocT!PipeDriver(m_loop);
m_processes = mallocT!ProcessDriver(m_loop, this);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes);
m_dns = mallocT!DNSDriver(m_events, m_signals);
m_files = mallocT!FileDriver(m_events);
m_core = mallocT!CoreDriver(m_loop, m_timers, m_events, m_processes, m_uring);

version (EventcoreEpollUsesUring) m_files = mallocT!FileDriver(m_uring);
else m_files = mallocT!FileDriver(m_events);

m_watchers = mallocT!WatcherDriver(m_events);
}

Expand Down Expand Up @@ -160,7 +175,8 @@ final class PosixEventDriver(Loop : PosixEventLoop) : EventDriver {
}


final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses) : EventDriverCore {
final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTimers, Events : EventDriverEvents, Processes : EventDriverProcesses, Uring)
: EventDriverCore {
@safe nothrow:
import core.atomic : atomicLoad, atomicStore;
import core.sync.mutex : Mutex;
Expand All @@ -177,14 +193,16 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
Timers m_timers;
Events m_events;
Processes m_processes;
Uring m_uring;
bool m_exit = false;
EventID m_wakeupEvent;

shared Mutex m_threadCallbackMutex;
ConsumableQueue!ThreadCallbackEntry m_threadCallbacks;
}

this(Loop loop, Timers timers, Events events, Processes processes)
this(Loop loop, Timers timers, Events events, Processes processes,
Uring uring)
@nogc {
m_loop = loop;
m_timers = timers;
Expand All @@ -194,6 +212,8 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
m_threadCallbackMutex = mallocT!(shared(Mutex));
m_threadCallbacks = mallocT!(ConsumableQueue!ThreadCallbackEntry);
m_threadCallbacks.reserve(1000);
m_uring = uring;
m_uring.registerEventID(m_wakeupEvent);
}

final void dispose()
Expand All @@ -207,7 +227,9 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
} catch (Exception e) assert(false, e.msg);
}

@property size_t waiterCount() const { return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount; }
@property size_t waiterCount() const {
return m_loop.m_waiterCount + m_timers.pendingCount + m_processes.pendingCount + m_uring.waiterCount;
}

final override ExitReason processEvents(Duration timeout)
{
Expand All @@ -223,12 +245,17 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
if (!waiterCount) {
return ExitReason.outOfWaiters;
}

version (EventcoreEpollUsesUring) {
// this is required to make the kernel aware of
// submitted SEQ, otherwise the first call to
// process events could stall
m_uring.submit;
}
bool got_events;

if (timeout <= 0.seconds) {
got_events = m_loop.doProcessEvents(0.seconds);
m_timers.process(MonoTime.currTime);
version (EventcoreEpollUsesUring) got_events |= m_uring.doProcessEvents(0.seconds);
} else {
auto now = MonoTime.currTime;
do {
Expand All @@ -237,6 +264,7 @@ final class PosixEventDriverCore(Loop : PosixEventLoop, Timers : EventDriverTime
auto prev_step = now;
now = MonoTime.currTime;
got_events |= m_timers.process(now);
version(EventcoreEpollUsesUring) got_events |= m_uring.doProcessEvents(0.seconds);
if (timeout != Duration.max)
timeout -= now - prev_step;
} while (timeout > 0.seconds && !m_exit && !got_events);
Expand Down
5 changes: 3 additions & 2 deletions source/eventcore/drivers/posix/events.d
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
if (!() @trusted {
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
int nl = addr.nameLen;
import eventcore.internal.utils : print;
if (bind(fd[1], addr.name, addr.nameLen) != 0)
return false;
assert(nl == addr.nameLen);
Expand Down Expand Up @@ -167,7 +166,9 @@ final class PosixEventDriverEvents(Loop : PosixEventLoop, Sockets : EventDriverS
@trusted {
EventID event = cast(EventID)fd;
ulong cnt;
() @trusted { .read(cast(int)event, &cnt, cnt.sizeof); } ();
() @trusted {
.read(cast(int)event, &cnt, cnt.sizeof);
} ();
trigger(event, cnt > 0);
}
} else {
Expand Down
Loading