-
Notifications
You must be signed in to change notification settings - Fork 100
/
watcher.go
759 lines (667 loc) · 22.6 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
// The MIT License (MIT)
//
// Copyright (c) 2019 xtaci
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package gaio
import (
"container/heap"
"container/list"
"io"
"net"
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
)
var (
aiocbPool sync.Pool
)
func init() {
aiocbPool.New = func() interface{} {
return new(aiocb)
}
}
// fdDesc holds all data structures associated with a file descriptor (fd).
// It maintains lists of pending read and write requests, as well as a pointer
// to the associated net.Conn object.
type fdDesc struct {
readers list.List // List of pending read requests
writers list.List // List of pending write requests
ptr uintptr // Pointer to the associated net.Conn object (stored as uintptr for GC safety)
}
// watcher is responsible for monitoring file descriptors, handling events,
// and processing asynchronous I/O requests. It manages event polling,
// maintains internal buffers, and interacts with various channels for signaling
// and communication.
type watcher struct {
// poll fd
pfd *poller // Poller for managing file descriptor events
// netpoll signals
chSignal chan Signal
// Lists for managing pending asynchronous I/O operations
// pendingXXX will be used interchangeably, like back buffer
// pendingCreate <--> pendingProcessing
chPendingNotify chan struct{} // Channel for notifications about new I/O requests
pendingCreate []*aiocb // List of I/O operations waiting to be processed
pendingProcessing []*aiocb // List of I/O operations currently under processing
pendingMutex sync.Mutex // Mutex to synchronize access to pending operations
recycles []*aiocb // List of completed I/O operations ready for reuse
// IO-completion events to user
chResults chan *aiocb
// Internal buffers for managing read operations
swapSize int // Capacity of the swap buffer (triple buffer system)
swapBufferFront []byte // Front buffer for reading
swapBufferMiddle []byte // Middle buffer for reading
swapBufferBack []byte // Back buffer for reading
bufferOffset int // Offset for the currently used buffer
shouldSwap int32 // Atomic flag indicating if a buffer swap is needed
// Channel for setting CPU affinity in the watcher loop
chCPUID chan int32
// Maps and structures for managing file descriptors and connections
descs map[int]*fdDesc // Map of file descriptors to their associated fdDesc
connIdents map[uintptr]int // Map of net.Conn pointers to unique identifiers (avoids GC issues)
timeouts timedHeap // Heap for managing requests with timeouts
timer *time.Timer // Timer for handling timeouts
// Garbage collection
gc []uintptr // List of connections to be garbage collected
gcMutex sync.Mutex // Mutex to synchronize access to the gc list
gcNotify chan struct{} // Channel to notify the GC processor
gcFound uint32 // number of net.Conn objects found unreachable by runtime
gcClosed uint32 // record number of objects closed successfully
// Shutdown and cleanup
die chan struct{} // Channel for signaling shutdown
dieOnce sync.Once // Ensures that the watcher is only closed once
}
// NewWatcher creates a new Watcher instance with a default internal buffer size of 64KB.
func NewWatcher() (*Watcher, error) {
return NewWatcherSize(defaultInternalBufferSize)
}
// NewWatcherSize creates a new Watcher instance with a specified internal buffer size.
//
// It allocates three shared buffers of the given size for handling read requests.
// This allows efficient management of read operations by using pre-allocated buffers.
func NewWatcherSize(bufsize int) (*Watcher, error) {
w := new(watcher)
// Initialize the poller for managing file descriptor events
pfd, err := openPoll()
if err != nil {
return nil, err
}
w.pfd = pfd
// Initialize channels for communication and signaling
w.chCPUID = make(chan int32)
w.chSignal = make(chan Signal, 1)
w.chPendingNotify = make(chan struct{}, 1)
w.chResults = make(chan *aiocb, maxEvents*4)
w.die = make(chan struct{})
// Allocate and initialize buffers for shared reading operations
w.swapSize = bufsize
w.swapBufferFront = make([]byte, bufsize)
w.swapBufferMiddle = make([]byte, bufsize)
w.swapBufferBack = make([]byte, bufsize)
// Initialize data structures for managing file descriptors and connections
w.descs = make(map[int]*fdDesc)
w.connIdents = make(map[uintptr]int)
w.gcNotify = make(chan struct{}, 1)
w.timer = time.NewTimer(0)
// Start background goroutines for netpoll and main loop
go w.pfd.Wait(w.chSignal)
go w.loop()
// Set up a finalizer to ensure resources are cleaned up when the Watcher is garbage collected
// NOTE: we need a manual garbage collection mechanism for watcher
wrapper := &Watcher{watcher: w}
runtime.SetFinalizer(wrapper, func(wrapper *Watcher) {
wrapper.Close()
})
return wrapper, nil
}
// Set Poller Affinity for Epoll/Kqueue
func (w *watcher) SetPollerAffinity(cpuid int) (err error) {
if cpuid >= runtime.NumCPU() {
return ErrCPUID
}
// store and wakeup
atomic.StoreInt32(&w.pfd.cpuid, int32(cpuid))
w.pfd.wakeup()
return nil
}
// Set Loop Affinity for syscall.Read/syscall.Write
func (w *watcher) SetLoopAffinity(cpuid int) (err error) {
if cpuid >= runtime.NumCPU() {
return ErrCPUID
}
// sendchan
select {
case w.chCPUID <- int32(cpuid):
case <-w.die:
return ErrConnClosed
}
return nil
}
// Close stops monitoring on events for all connections
func (w *watcher) Close() (err error) {
w.dieOnce.Do(func() {
close(w.die)
err = w.pfd.Close()
})
return err
}
// notify new operations pending
func (w *watcher) notifyPending() {
select {
case w.chPendingNotify <- struct{}{}:
default:
}
}
// WaitIO blocks until one or more read/write operations are completed or an error occurs.
// It returns a slice of OpResult containing details of completed operations and any errors encountered.
//
// The method operates as follows:
// 1. It recycles previously used aiocb objects to avoid memory leaks and reuse them for new I/O operations.
// 2. It waits for completion notifications from the chResults channel and accumulates results.
// 3. It ensures that the buffer in OpResult is not overwritten until the next call to WaitIO.
func (w *watcher) WaitIO() (r []OpResult, err error) {
// recycle previous aiocb
for k := range w.recycles {
aiocbPool.Put(w.recycles[k])
// avoid memory leak
w.recycles[k] = nil
}
w.recycles = w.recycles[:0]
for {
select {
case pcb := <-w.chResults:
r = append(r, OpResult{Operation: pcb.op, Conn: pcb.conn, IsSwapBuffer: pcb.useSwap, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx})
// avoid memory leak
pcb.ctx = nil
w.recycles = append(w.recycles, pcb)
for len(w.chResults) > 0 {
pcb := <-w.chResults
r = append(r, OpResult{Operation: pcb.op, Conn: pcb.conn, IsSwapBuffer: pcb.useSwap, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx})
// avoid memory leak
pcb.ctx = nil
w.recycles = append(w.recycles, pcb)
}
// The buffer swapping mechanism ensures that the 'Buffer' in the returned OpResult
// is not overwritten until the next call to WaitIO. This allows the user to safely
// access the buffer without worrying about it being modified by subsequent operations.
//
// We use a triple buffer system to manage the buffers efficiently. This system
// maintains three types of buffer states during operations:
//
// 1. **DONE**: Results are fully processed and accessible to the user.
// 2. **INFLIGHT**: Results are completed but still being delivered to chResults.
// 3. **WRITING**: Results are being written to the next buffer.
//
// T0: DONE(B0) | INFLIGHT DELIVERY(B0)
// switching to B1
// T0': WRITING(B1)
//
// T1: DONE(B0+B1) | INFLIGHT DELIVERY(B1)
// switching to B2
// T1': WRITING(B2)
//
// T2: DONE(B1+B2) | INFLIGHT DELIVERY(B2)
// switching to B0
// T2': WRITING(B0)
// - and so on...
//
// Atomic operation ensures synchronization for buffer swapping.
atomic.CompareAndSwapInt32(&w.shouldSwap, 0, 1)
return r, nil
case <-w.die:
return nil, ErrWatcherClosed
}
}
}
// Read submits an asynchronous read request on 'conn' with context 'ctx' and optional buffer 'buf'.
// If 'buf' is nil, an internal buffer is used. 'ctx' is a user-defined value passed unchanged.
func (w *watcher) Read(ctx interface{}, conn net.Conn, buf []byte) error {
return w.aioCreate(ctx, OpRead, conn, buf, zeroTime, false)
}
// ReadTimeout submits an asynchronous read request on 'conn' with context 'ctx' and buffer 'buf',
// expecting to read some bytes before 'deadline'. 'ctx' is a user-defined value passed unchanged.
func (w *watcher) ReadTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error {
return w.aioCreate(ctx, OpRead, conn, buf, deadline, false)
}
// ReadFull submits an asynchronous read request on 'conn' with context 'ctx' and buffer 'buf',
// expecting to fill the buffer before 'deadline'. 'ctx' is a user-defined value passed unchanged.
// 'buf' must not be nil for ReadFull.
func (w *watcher) ReadFull(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error {
if len(buf) == 0 {
return ErrEmptyBuffer
}
return w.aioCreate(ctx, OpRead, conn, buf, deadline, true)
}
// Write submits an asynchronous write request on 'conn' with context 'ctx' and buffer 'buf'.
// 'ctx' is a user-defined value passed unchanged.
func (w *watcher) Write(ctx interface{}, conn net.Conn, buf []byte) error {
if len(buf) == 0 {
return ErrEmptyBuffer
}
return w.aioCreate(ctx, OpWrite, conn, buf, zeroTime, false)
}
// WriteTimeout submits an asynchronous write request on 'conn' with context 'ctx' and buffer 'buf',
// expecting to complete writing before 'deadline'. 'ctx' is a user-defined value passed unchanged.
func (w *watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error {
if len(buf) == 0 {
return ErrEmptyBuffer
}
return w.aioCreate(ctx, OpWrite, conn, buf, deadline, false)
}
// Free releases resources related to 'conn' immediately, such as socket file descriptors.
func (w *watcher) Free(conn net.Conn) error {
return w.aioCreate(nil, opDelete, conn, nil, zeroTime, false)
}
// aioCreate initiates an asynchronous IO operation with the given parameters.
// It creates an aiocb structure and adds it to the pending queue, then notifies the watcher.
func (w *watcher) aioCreate(ctx interface{}, op OpType, conn net.Conn, buf []byte, deadline time.Time, readfull bool) error {
select {
case <-w.die:
return ErrWatcherClosed
default:
var ptr uintptr
if conn != nil && reflect.TypeOf(conn).Kind() == reflect.Ptr {
ptr = reflect.ValueOf(conn).Pointer()
} else {
return ErrUnsupported
}
cb := aiocbPool.Get().(*aiocb)
*cb = aiocb{op: op, ptr: ptr, size: 0, ctx: ctx, conn: conn, buffer: buf, deadline: deadline, readFull: readfull, idx: -1}
w.pendingMutex.Lock()
w.pendingCreate = append(w.pendingCreate, cb)
w.pendingMutex.Unlock()
w.notifyPending()
return nil
}
}
// tryRead attempts to read data on aiocb and notify the completion.
// Returns true if the operation is completed; false if it is not completed and will retry later.
func (w *watcher) tryRead(fd int, pcb *aiocb) bool {
// step 1. bind to proper buffer
buf := pcb.buffer
useSwap := false
backBuffer := false
if buf == nil {
if atomic.CompareAndSwapInt32(&w.shouldSwap, 1, 0) {
// A successful CAS operation triggers internal buffer swapping:
//
// Initial State:
//
// +-------+ +--------+ +------+
// | Front | -> | Middle | -> | Back |
// +-------+ +--------+ +------+
// | ^
// |________________________|
//
// After One Circular Shift:
//
// +--------+ +------+ +-------+
// | Middle | -> | Back | -> | Front |
// +--------+ +------+ +-------+
// | ^
// |________________________|
//
// After Two Circular Shifts:
//
// +------+ +-------+ +--------+
// | Back | -> | Front | -> | Middle |
// +------+ +-------+ +--------+
// | ^
// |________________________|
w.swapBufferFront, w.swapBufferMiddle, w.swapBufferBack = w.swapBufferMiddle, w.swapBufferBack, w.swapBufferFront
w.bufferOffset = 0
}
buf = w.swapBufferFront[w.bufferOffset:]
if len(buf) > 0 {
useSwap = true
} else {
backBuffer = true
buf = pcb.backBuffer[:]
}
}
// step 2. read into buffer
for {
nr, er := rawRead(fd, buf[pcb.size:])
if er == syscall.EAGAIN {
return false
}
// On MacOS we can see EINTR here if the user
// pressed ^Z.
if er == syscall.EINTR {
continue
}
// if er is nil, accumulate bytes read
if er == nil {
pcb.size += nr
}
pcb.err = er
// proper setting of EOF
if nr == 0 && er == nil {
pcb.err = io.EOF
}
break
}
// step 3.check read full operation
// the buffer of readfull operation is guaranteed from caller
if pcb.readFull { // read full operation
if pcb.err != nil {
// the operation is completed due to error
return true
}
if pcb.size == len(pcb.buffer) {
// the operation is completed normally
return true
}
return false
}
// step 4. non read-full operations
if useSwap { // IO completed with internal buffer
pcb.useSwap = true
pcb.buffer = buf[:pcb.size] // set len to pcb.size
w.bufferOffset += pcb.size
} else if backBuffer { // use per request tiny buffer
pcb.buffer = buf
}
return true
}
// tryWrite attempts to write data on aiocb and notifies the completion.
// Returns true if the operation is completed; false if it is not completed and will retry later.
func (w *watcher) tryWrite(fd int, pcb *aiocb) bool {
var nw int
var ew error
if pcb.buffer != nil {
for {
nw, ew = rawWrite(fd, pcb.buffer[pcb.size:])
pcb.err = ew
// Socket buffer is full
if ew == syscall.EAGAIN {
return false
}
// On MacOS/BSDs, if mbufs ran out, ENOBUFS will be returned
// https://man.freebsd.org/cgi/man.cgi?query=mbuf&sektion=9&format=html
if ew == syscall.ENOBUFS {
return false
}
// On MacOS we can see EINTR here if the user pressed ^Z.
if ew == syscall.EINTR {
continue
}
// If no error, accumulate bytes written
if ew == nil {
pcb.size += nw
}
break
}
}
// Returns true if all bytes are written or there are errors on the socket
if pcb.size == len(pcb.buffer) || ew != nil {
return true
}
// Should retry later
return false
}
// releaseConn releases resources related to the connection identified by 'ident'.
func (w *watcher) releaseConn(ident int) {
if desc, ok := w.descs[ident]; ok {
// Remove all pending read requests
for e := desc.readers.Front(); e != nil; e = e.Next() {
tcb := e.Value.(*aiocb)
// Notify caller with error
tcb.err = io.ErrClosedPipe
w.deliver(tcb)
}
// Remove all pending write requests
for e := desc.writers.Front(); e != nil; e = e.Next() {
tcb := e.Value.(*aiocb)
// Notify caller with error
tcb.err = io.ErrClosedPipe
w.deliver(tcb)
}
// Purge the fdDesc
delete(w.descs, ident)
delete(w.connIdents, desc.ptr)
// Close the socket file descriptor duplicated from net.Conn
syscall.Close(ident)
}
}
// deliver sends the aiocb to the user to retrieve the results.
func (w *watcher) deliver(pcb *aiocb) {
if pcb.idx != -1 {
heap.Remove(&w.timeouts, pcb.idx)
}
select {
case w.chResults <- pcb:
case <-w.die:
}
}
// loop is the core event loop of the watcher, handling various events and tasks.
func (w *watcher) loop() {
// Defer function to release all resources
defer func() {
for ident := range w.descs {
w.releaseConn(ident)
}
}()
for {
select {
case <-w.chPendingNotify:
// Swap w.pendingCreate with w.pendingProcessing
w.pendingMutex.Lock()
w.pendingCreate, w.pendingProcessing = w.pendingProcessing, w.pendingCreate
for i := 0; i < len(w.pendingCreate); i++ {
w.pendingCreate[i] = nil
}
w.pendingCreate = w.pendingCreate[:0]
w.pendingMutex.Unlock()
// handlePending is a synchronous operation to process all pending requests
w.handlePending(w.pendingProcessing)
case sig := <-w.chSignal: // Poller events
w.handleEvents(sig.events)
select {
case sig.done <- struct{}{}:
case <-w.die:
return
}
case <-w.timer.C: // a global timeout heap to handle all timeouts
for w.timeouts.Len() > 0 {
now := time.Now()
pcb := w.timeouts[0]
if now.After(pcb.deadline) {
// ErrDeadline
pcb.err = ErrDeadline
// remove from list
pcb.l.Remove(pcb.elem)
// deliver with error: ErrDeadline
w.deliver(pcb)
} else {
w.timer.Reset(pcb.deadline.Sub(now))
break
}
}
case <-w.gcNotify:
w.handleGC()
case cpuid := <-w.chCPUID:
setAffinity(cpuid)
case <-w.die:
return
}
}
}
// handleGC processes the garbage collection of net.Conn objects.
func (w *watcher) handleGC() {
runtime.GC()
w.gcMutex.Lock()
if len(w.gc) > 0 {
for _, ptr := range w.gc {
if ident, ok := w.connIdents[ptr]; ok {
w.releaseConn(ident)
}
}
w.gcClosed += uint32(len(w.gc))
w.gc = w.gc[:0]
}
w.gcMutex.Unlock()
}
// handlePending processes new requests, acting as a reception desk.
func (w *watcher) handlePending(pending []*aiocb) {
PENDING:
for _, pcb := range pending {
ident, ok := w.connIdents[pcb.ptr]
// Resource releasing operation
if pcb.op == opDelete && ok {
w.releaseConn(ident)
continue
}
// Check if the file descriptor is already registered
var desc *fdDesc
if ok {
desc = w.descs[ident]
} else {
// New file descriptor registration
if dupfd, err := dupconn(pcb.conn); err != nil {
// unexpected situation, should notify caller if we cannot dup(2)
pcb.err = err
w.deliver(pcb)
continue
} else {
// as we duplicated successfully, we're safe to
// close the original connection
pcb.conn.Close()
// assign idents
ident = dupfd
// let epoll or kqueue to watch this fd!
werr := w.pfd.Watch(ident)
if werr != nil {
// unexpected situation, should notify caller if we cannot watch
pcb.err = werr
w.deliver(pcb)
continue
}
// update registration table
desc = &fdDesc{ptr: pcb.ptr}
w.descs[ident] = desc
w.connIdents[pcb.ptr] = ident
// the 'conn' object is still useful for GC finalizer.
// note finalizer function cannot hold reference to net.Conn,
// if not it will never be GC-ed.
runtime.SetFinalizer(pcb.conn, func(c net.Conn) {
w.gcMutex.Lock()
ptr := reflect.ValueOf(c).Pointer()
w.gc = append(w.gc, ptr)
w.gcFound++
w.gcMutex.Unlock()
// notify gc processor
select {
case w.gcNotify <- struct{}{}:
default:
}
})
}
}
// as the file descriptor is registered, we can proceed to IO operations
switch pcb.op {
case OpRead:
// if there's no pending read requests
// we can try to read immediately
if desc.readers.Len() == 0 {
if w.tryRead(ident, pcb) {
w.deliver(pcb)
// request fulfilled, continue to next
continue PENDING
}
}
// if the request is not fulfilled, we should queue it
pcb.l = &desc.readers
pcb.elem = pcb.l.PushBack(pcb)
case OpWrite:
if desc.writers.Len() == 0 {
if w.tryWrite(ident, pcb) {
w.deliver(pcb)
continue PENDING
}
}
pcb.l = &desc.writers
pcb.elem = pcb.l.PushBack(pcb)
}
// if the request has deadline set, we should push it to timeout heap
if !pcb.deadline.IsZero() {
heap.Push(&w.timeouts, pcb)
if w.timeouts.Len() == 1 {
w.timer.Reset(time.Until(pcb.deadline))
}
}
}
}
// handleEvents processes a batch of poller events and manages I/O operations for the associated file descriptors.
// Each event contains information about file descriptor activity, and the handler ensures that read and write
// operations are completed correctly even if the file descriptor has been re-opened after being closed.
//
// Note: If a file descriptor is closed externally (e.g., by conn.Close()), and then re-opened with the same
// handler number (fd), operations on the old fd can lead to errors. To handle this, the watcher duplicates the
// file descriptor from net.Conn, and operations are based on the unique identifier 'e.ident'. This prevents
// misreading or miswriting on re-created file descriptors.
//
// The poller automatically removes closed file descriptors from the event poller (epoll(7), kqueue(2)), so we
// need to handle these events correctly and ensure that all pending operations are processed.
func (w *watcher) handleEvents(events pollerEvents) {
for _, e := range events {
if desc, ok := w.descs[e.ident]; ok {
// Process read events if the event indicates a read operation
if e.ev&EV_READ != 0 {
var next *list.Element
// try to complete all read requests
for elem := desc.readers.Front(); elem != nil; elem = next {
next = elem.Next()
pcb := elem.Value.(*aiocb)
if w.tryRead(e.ident, pcb) {
w.deliver(pcb) // Deliver the completed read operation
desc.readers.Remove(elem) // Remove the completed read request from the queue
} else {
// Stop processing further read requests if the current read operation fails
break
}
}
}
// Process write events if the event indicates a write operation
if e.ev&EV_WRITE != 0 {
var next *list.Element
for elem := desc.writers.Front(); elem != nil; elem = next {
next = elem.Next()
pcb := elem.Value.(*aiocb)
if w.tryWrite(e.ident, pcb) {
w.deliver(pcb)
desc.writers.Remove(elem)
} else {
break
}
}
}
}
}
}
// read gcFound & gcClosed
func (w *watcher) GetGC() (found uint32, closed uint32) {
w.gcMutex.Lock()
defer w.gcMutex.Unlock()
return w.gcFound, w.gcClosed
}