-
Notifications
You must be signed in to change notification settings - Fork 17
/
server_session.go
157 lines (137 loc) · 4.04 KB
/
server_session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package dmsg
import (
"io"
"net"
"github.com/sirupsen/logrus"
"github.com/skycoin/yamux"
"github.com/skycoin/dmsg/netutil"
"github.com/skycoin/dmsg/noise"
"github.com/skycoin/dmsg/servermetrics"
)
// ServerSession represents a session from the perspective of a dmsg server.
type ServerSession struct {
*SessionCommon
m servermetrics.Metrics
}
func makeServerSession(m servermetrics.Metrics, entity *EntityCommon, conn net.Conn) (ServerSession, error) {
var sSes ServerSession
sSes.SessionCommon = new(SessionCommon)
sSes.nMap = make(noise.NonceMap)
if err := sSes.SessionCommon.initServer(entity, conn); err != nil {
m.RecordSession(servermetrics.DeltaFailed) // record failed connection
return sSes, err
}
sSes.m = m
return sSes, nil
}
// Close implements io.Closer
func (ss *ServerSession) Close() error {
if ss == nil {
return nil
}
return ss.SessionCommon.Close()
}
// Serve serves the session.
func (ss *ServerSession) Serve() {
ss.m.RecordSession(servermetrics.DeltaConnect) // record successful connection
defer ss.m.RecordSession(servermetrics.DeltaDisconnect) // record disconnection
for {
yStr, err := ss.ys.AcceptStream()
if err != nil {
switch err {
case yamux.ErrSessionShutdown, io.EOF:
ss.log.WithError(err).Info("Stopping session...")
default:
ss.log.WithError(err).Warn("Failed to accept stream, stopping session...")
}
return
}
log := ss.log.WithField("yamux_id", yStr.StreamID())
log.Info("Initiating stream.")
go func(yStr *yamux.Stream) {
err := ss.serveStream(log, yStr)
log.WithError(err).Info("Stopped stream.")
}(yStr)
}
}
func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr *yamux.Stream) error {
readRequest := func() (StreamRequest, error) {
obj, err := ss.readObject(yStr)
if err != nil {
return StreamRequest{}, err
}
req, err := obj.ObtainStreamRequest()
if err != nil {
return StreamRequest{}, err
}
// TODO(evanlinjin): Implement timestamp tracker.
if err := req.Verify(0); err != nil {
return StreamRequest{}, err
}
if req.SrcAddr.PK != ss.rPK {
return StreamRequest{}, ErrReqInvalidSrcPK
}
return req, nil
}
// Read request.
req, err := readRequest()
if err != nil {
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
return err
}
log = log.
WithField("src_addr", req.SrcAddr).
WithField("dst_addr", req.DstAddr)
log.Debug("Read stream request from initiating side.")
// Obtain next session.
ss2, ok := ss.entity.serverSession(req.DstAddr.PK)
if !ok {
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
return ErrReqNoNextSession
}
log.Debug("Obtained next session.")
// Forward request and obtain/check response.
yStr2, resp, err := ss2.forwardRequest(req)
if err != nil {
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
return err
}
log.Debug("Forwarded stream request.")
// Forward response.
if err := ss.writeObject(yStr, resp); err != nil {
ss.m.RecordStream(servermetrics.DeltaFailed) // record failed stream
return err
}
log.Debug("Forwarded stream response.")
// Serve stream.
log.Info("Serving stream.")
ss.m.RecordStream(servermetrics.DeltaConnect) // record successful stream
defer ss.m.RecordStream(servermetrics.DeltaDisconnect) // record disconnection
return netutil.CopyReadWriteCloser(yStr, yStr2)
}
func (ss *ServerSession) forwardRequest(req StreamRequest) (yStr *yamux.Stream, respObj SignedObject, err error) {
defer func() {
if err != nil && yStr != nil {
ss.log.
WithError(yStr.Close()).
Debugf("After forwardRequest failed, the yamux stream is closed.")
}
}()
if yStr, err = ss.ys.OpenStream(); err != nil {
return nil, nil, err
}
if err = ss.writeObject(yStr, req.raw); err != nil {
return nil, nil, err
}
if respObj, err = ss.readObject(yStr); err != nil {
return nil, nil, err
}
var resp StreamResponse
if resp, err = respObj.ObtainStreamResponse(); err != nil {
return nil, nil, err
}
if err = resp.Verify(req); err != nil {
return nil, nil, err
}
return yStr, respObj, nil
}