-
Notifications
You must be signed in to change notification settings - Fork 12
/
dnsredir.go
202 lines (170 loc) · 5.22 KB
/
dnsredir.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/*
* Created Feb 16, 2020
*/
package dnsredir
import (
"context"
"errors"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/debug"
"github.com/coredns/coredns/plugin/metrics"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"strconv"
"sync/atomic"
"time"
)
var log = clog.NewWithPlugin(pluginName)
type Dnsredir struct {
Next plugin.Handler
Upstreams *[]Upstream
}
// Upstream manages a pool of proxy upstream hosts
// see: github.com/coredns/proxy#proxy.go
type Upstream interface {
// Check if given domain name should be routed to this upstream zone
Match(name string) bool
// Select an upstream host to be routed to, nil if no available host
Select() *UpstreamHost
// Exchanger returns the exchanger to be used for this upstream
//Exchanger() interface{}
// Send question to upstream host and await for response
//Exchange(ctx context.Context, state request.Request) (*dns.Msg, error)
Start() error
Stop() error
}
func (r *Dnsredir) OnStartup() error {
for _, up := range *r.Upstreams {
if err := up.Start(); err != nil {
return err
}
}
return nil
}
func (r *Dnsredir) OnShutdown() error {
for _, up := range *r.Upstreams {
if err := up.Stop(); err != nil {
return err
}
}
return nil
}
func (r *Dnsredir) ServeDNS(ctx context.Context, w dns.ResponseWriter, req *dns.Msg) (int, error) {
state := &request.Request{W: w, Req: req}
name := state.Name()
server := metrics.WithServer(ctx)
upstream0, t := r.match(server, name)
if upstream0 == nil {
log.Debugf("%q not found in name list, t: %v", name, t)
return plugin.NextOrFailure(r.Name(), r.Next, ctx, w, req)
}
upstream := upstream0.(*reloadableUpstream)
log.Debugf("%q in name list, t: %v", name, t)
var reply *dns.Msg
var upstreamErr error
var tryCount int32
deadline := time.Now().Add(defaultTimeout)
for time.Now().Before(deadline) {
start := time.Now()
tryCount++
host := upstream.Select()
if host == nil || tryCount > upstream.maxRetry {
log.Debug(errNoHealthy)
return dns.RcodeServerFailure, errNoHealthy
}
log.Debugf("Upstream host %v is selected", host.Name())
for {
t := time.Now()
reply, upstreamErr = host.Exchange(ctx, state, upstream.bootstrap, upstream.noIPv6)
log.Debugf("rtt: %v", time.Since(t))
if upstreamErr == errCachedConnClosed {
// [sic] Remote side closed conn, can only happen with TCP.
// Retry for another connection
log.Debugf("%v: %v", upstreamErr, host.Name())
continue
}
break
}
if upstreamErr != nil {
if upstream.maxFails != 0 {
log.Warningf("Exchange() failed error: %v", upstreamErr)
healthCheck(upstream, host)
}
continue
}
if !state.Match(reply) {
debug.Hexdumpf(reply, "Wrong reply id: %v, qname: %v qtype: %v", reply.Id, state.QName(), state.QType())
formerr := new(dns.Msg)
formerr.SetRcode(state.Req, dns.RcodeFormatError)
_ = w.WriteMsg(formerr)
return dns.RcodeSuccess, nil
}
// Add resolved IPs to ipset/pf before write response to DNS resolver
// thus the rule based routing can take effect immediately
ipsetAddIP(upstream, reply)
pfAddIP(upstream, reply)
_ = w.WriteMsg(reply)
RequestDuration.WithLabelValues(server, host.Name()).Observe(float64(time.Since(start).Milliseconds()))
RequestCount.WithLabelValues(server, host.Name()).Inc()
rc, ok := dns.RcodeToString[reply.Rcode]
if !ok {
rc = strconv.Itoa(reply.Rcode)
}
RcodeCount.WithLabelValues(server, host.Name(), rc).Inc()
return dns.RcodeSuccess, nil
}
if upstreamErr == nil {
panic("Why upstreamErr is nil?! Are you in a debugger or your machine running slow?")
}
return dns.RcodeServerFailure, upstreamErr
}
func healthCheck(r *reloadableUpstream, uh *UpstreamHost) {
// Skip unnecessary health checking
if r.checkInterval == 0 || r.maxFails == 0 {
return
}
failTimeout := defaultFailTimeout
fails := atomic.AddInt32(&uh.fails, 1)
go func(uh *UpstreamHost) {
time.Sleep(failTimeout)
// Failure count may go negative here, should be rectified by HC eventually
atomic.AddInt32(&uh.fails, -1)
// Kick off health check on every failureCheck failure
if fails%failureCheck == 0 {
_ = uh.Check()
}
}(uh)
}
func (r *Dnsredir) Name() string { return pluginName }
func (r *Dnsredir) match(server, name string) (Upstream, time.Duration) {
t1 := time.Now()
if r.Upstreams == nil {
panic("Why Dnsredir.Upstreams is nil?!")
}
// Don't check validity of domain name, delegate to upstream host
if len(name) > 1 {
name = removeTrailingDot(name)
}
for _, up := range *r.Upstreams {
// For maximum performance, we search the first matched item and return directly
// Unlike proxy plugin, which try to find longest match
if up.Match(name) {
t2 := time.Since(t1)
NameLookupDuration.WithLabelValues(server, "1").Observe(float64(t2.Milliseconds()))
return up, t2
}
}
t2 := time.Since(t1)
NameLookupDuration.WithLabelValues(server, "0").Observe(float64(t2.Milliseconds()))
return nil, t2
}
var (
errNoHealthy = errors.New("no healthy upstream host")
errCachedConnClosed = errors.New("cached connection was closed by peer")
)
const (
defaultTimeout = 15 * time.Second
defaultFailTimeout = 2000 * time.Millisecond
failureCheck = 3
)