Skip to content

Commit

Permalink
all: tons of builder work
Browse files Browse the repository at this point in the history
* reverse buildlet rework (multiplexed TCP connections, instead
  of a hacky reverse roundtripper)

* scaleway ARM image improvements

* parallel gzip implementation, which makes things ~8x faster on
  Scaleway.

* merge watcher into the coordinator, for easier deployments

Change-Id: I55d769f982e6583b261435309faa1f718a15fde1
Reviewed-on: https://go-review.googlesource.com/12665
Reviewed-by: Brad Fitzpatrick <[email protected]>
  • Loading branch information
bradfitz committed Sep 15, 2015
1 parent c018f8c commit 1f0d8f2
Show file tree
Hide file tree
Showing 27 changed files with 1,618 additions and 543 deletions.
21 changes: 20 additions & 1 deletion buildlet/buildletclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func (c *Client) SetCloseFunc(fn func() error) {
c.closeFunc = fn
}

var ErrClosed = errors.New("buildlet: Client closed")

func (c *Client) Close() error {
c.setPeerDead(errors.New("Close called"))
c.setPeerDead(ErrClosed) // TODO(bradfitz): split concept of closed vs. broken?
var err error
if c.closeFunc != nil {
err = c.closeFunc()
Expand All @@ -67,6 +69,10 @@ func (c *Client) Close() error {
// To be called only via c.setPeerDeadOnce.Do(s.setPeerDead)
func (c *Client) setPeerDead(err error) {
c.setPeerDeadOnce.Do(func() {
c.MarkBroken()
if err == nil {
err = errors.New("peer dead (no specific error)")
}
c.deadErr = err
close(c.peerDead)
})
Expand Down Expand Up @@ -156,6 +162,13 @@ func (c *Client) URL() string {

func (c *Client) IPPort() string { return c.ipPort }

func (c *Client) Name() string {
if c.ipPort != "" {
return c.ipPort
}
return "(unnamed-buildlet)"
}

// MarkBroken marks this client as broken in some way.
func (c *Client) MarkBroken() {
c.mu.Lock()
Expand Down Expand Up @@ -571,6 +584,12 @@ type Status struct {

// Status returns an Status value describing this buildlet.
func (c *Client) Status() (Status, error) {
select {
case <-c.peerDead:
return Status{}, c.deadErr
default:
// Continue below.
}
req, err := http.NewRequest("GET", c.URL()+"/status", nil)
if err != nil {
return Status{}, err
Expand Down
8 changes: 8 additions & 0 deletions cmd/buildlet/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ buildlet.windows-amd64: buildlet.go reverse.go buildlet_windows.go
GOOS=windows GOARCH=amd64 go build -o $@
cat $@ | (cd ../upload && go run upload.go --public go-builder-data/$@)

buildlet.linux-arm: buildlet.go reverse.go
GOOS=linux GOARCH=arm go build -o $@
cat $@ | (cd ../upload && go run upload.go --public go-builder-data/$@)

dev-buildlet.linux-arm: buildlet.go reverse.go
GOOS=linux GOARCH=arm go build -o $@
cat $@ | (cd ../upload && go run upload.go --public dev-go-builder-data/buildlet.linux-arm)

dev-buildlet.linux-amd64: buildlet.go reverse.go
GOOS=linux GOARCH=amd64 go build -o $@
cat $@ | (cd ../upload && go run upload.go --public dev-go-builder-data/buildlet.linux-amd64)
Expand Down
50 changes: 42 additions & 8 deletions cmd/buildlet/buildlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"golang.org/x/build/buildlet"
"golang.org/x/build/envutil"
"golang.org/x/build/pargzip"
"google.golang.org/cloud/compute/metadata"
)

Expand All @@ -48,8 +49,18 @@ var (
listenAddr = flag.String("listen", "AUTO", "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.")
reverse = flag.String("reverse", "", "if non-empty, go into reverse mode where the buildlet dials the coordinator instead of listening for connections. The value is a comma-separated list of modes, e.g. 'darwin-arm,darwin-amd64-race'")
coordinator = flag.String("coordinator", "localhost:8119", "address of coordinator, in production use farmer.golang.org. Only used in reverse mode.")
hostname = flag.String("hostname", "", "hostname to advertise to coordinator for reverse mode; default is actual hostname")
)

// Bump this whenever something notable happens, or when another
// component needs a certain feature. This shows on the coordinator
// per reverse client, and is also accessible via the buildlet
// package's client API (via the Status method).
//
// Notable versions:
// 3: switched to revdial protocol
const buildletVersion = 3

func defaultListenAddr() string {
if runtime.GOOS == "darwin" {
// Darwin will never run on GCE, so let's always
Expand All @@ -74,7 +85,8 @@ func main() {
if runtime.GOOS == "plan9" {
log.SetOutput(&plan9LogWriter{w: os.Stderr})
}
if runtime.GOOS == "linux" && !inKube {
onGCE := metadata.OnGCE()
if runtime.GOOS == "linux" && onGCE && !inKube {
if w, err := os.OpenFile("/dev/console", os.O_WRONLY, 0); err == nil {
log.SetOutput(w)
}
Expand All @@ -88,7 +100,6 @@ func main() {
*listenAddr = v
}

onGCE := metadata.OnGCE()
if !onGCE && !strings.HasPrefix(*listenAddr, "localhost:") {
log.Printf("** WARNING *** This server is unsafe and offers no security. Be careful.")
}
Expand Down Expand Up @@ -128,7 +139,10 @@ func main() {
http.HandleFunc("/debug/goroutines", handleGoroutines)
http.HandleFunc("/debug/x", handleX)

password := metadataValue("password")
var password string
if *reverse == "" {
password = metadataValue("password")
}
requireAuth := func(handler func(w http.ResponseWriter, r *http.Request)) http.Handler {
return requirePasswordHandler{http.HandlerFunc(handler), password}
}
Expand All @@ -145,7 +159,7 @@ func main() {
if *reverse == "" {
listenForCoordinator()
} else {
repeatDialCoordinator()
dialCoordinator()
}
}

Expand Down Expand Up @@ -351,7 +365,7 @@ func handleGetTGZ(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bogus dir", http.StatusBadRequest)
return
}
zw := gzip.NewWriter(w)
zw := pargzip.NewWriter(w)
tw := tar.NewWriter(zw)
base := filepath.Join(*workDir, filepath.FromSlash(dir))
err := filepath.Walk(base, func(path string, fi os.FileInfo, err error) error {
Expand Down Expand Up @@ -442,11 +456,13 @@ func handleWriteTGZ(w http.ResponseWriter, r *http.Request) {
}
res, err := http.Get(urlStr)
if err != nil {
log.Printf("Failed to fetch tgz URL %s: %v", urlStr, err)
http.Error(w, fmt.Sprintf("fetching URL %s: %v", urlStr, err), http.StatusInternalServerError)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Printf("Failed to fetch tgz URL %s: status=%v", urlStr, res.Status)
http.Error(w, fmt.Sprintf("fetching provided url: %s", res.Status), http.StatusInternalServerError)
return
}
Expand All @@ -458,6 +474,7 @@ func handleWriteTGZ(w http.ResponseWriter, r *http.Request) {

err := untar(tgz, baseDir)
if err != nil {
log.Printf("untar failure: %v", err)
status := http.StatusInternalServerError
if he, ok := err.(httpStatuser); ok {
status = he.httpStatus()
Expand Down Expand Up @@ -906,8 +923,25 @@ func handleRemoveAll(w http.ResponseWriter, r *http.Request) {
}
for _, p := range paths {
log.Printf("Removing %s", p)
p = filepath.Join(*workDir, filepath.FromSlash(p))
if err := os.RemoveAll(p); err != nil {
fullDir := filepath.Join(*workDir, filepath.FromSlash(p))
err := os.RemoveAll(fullDir)
if p == "." && err != nil {
// If workDir is a mountpoint and/or contains a binary
// using it, we can get a "Device or resource busy" error.
// See if it's now empty and ignore the error.
if f, oerr := os.Open(*workDir); oerr == nil {
if all, derr := f.Readdirnames(-1); derr == nil && len(all) == 0 {
log.Printf("Ignoring fail of RemoveAll(.)")
err = nil
} else {
log.Printf("Readdir = %q, %f", all, derr)
}
f.Close()
} else {
log.Printf("Failed to open workdir: %v", oerr)
}
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -933,7 +967,7 @@ func handleStatus(w http.ResponseWriter, r *http.Request) {
return
}
status := buildlet.Status{
Version: 1,
Version: buildletVersion,
}
b, err := json.Marshal(status)
if err != nil {
Expand Down
107 changes: 25 additions & 82 deletions cmd/buildlet/reverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"crypto/md5"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -20,25 +19,20 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"golang.org/x/build"
"golang.org/x/build/revdial"
)

func repeatDialCoordinator() {
for {
if err := dialCoordinator(); err != nil {
log.Print(err)
}
log.Printf("Waiting 30 seconds and dialing again.")
time.Sleep(30 * time.Second)
}
}

func dialCoordinator() error {
devMode := !strings.HasPrefix(*coordinator, "farmer.golang.org")

if *hostname == "" {
*hostname, _ = os.Hostname()
}

modes := strings.Split(*reverse, ",")
var keys []string
for _, m := range modes {
Expand Down Expand Up @@ -96,88 +90,30 @@ func dialCoordinator() error {
}
req.Header["X-Go-Builder-Type"] = modes
req.Header["X-Go-Builder-Key"] = keys
req.Header.Set("X-Go-Builder-Hostname", *hostname)
req.Header.Set("X-Go-Builder-Version", strconv.Itoa(buildletVersion))
if err := req.Write(conn); err != nil {
return fmt.Errorf("coordinator /reverse request failed: %v", err)
}
resp, err := http.ReadResponse(bufr, req)
if err != nil {
return fmt.Errorf("coordinator /reverse response failed: %v", err)
}
if resp.StatusCode != 200 {
if resp.StatusCode != 101 {
msg, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("coordinator registration failed:\n\t%s", msg)
}
resp.Body.Close()

// The client becomes the simple http server.
log.Printf("Connected to coordinator, serving HTTP back at them.")
stateCh := make(chan http.ConnState, 1)
srv := &http.Server{
ConnState: func(_ net.Conn, state http.ConnState) { stateCh <- state },
}
return srv.Serve(&reverseListener{
conn: conn,
stateCh: stateCh,
})
}

// reverseListener serves out a single underlying conn, once.
//
// It is designed to be passed to a *http.Server, which loops
// continually calling Accept. As this reverse connection only
// ever has one connection to hand out, it responds to the first
// Accept, and then blocks on the second Accept.
//
// While blocking on the second Accept, this listener takes on the
// job of checking the health of the original net.Conn it handed out.
// If it goes unused for a while, it closes the original net.Conn
// and returns an error, ending the life of the *http.Server
type reverseListener struct {
done bool
conn net.Conn
stateCh <-chan http.ConnState
log.Printf("Connected to coordinator; reverse dialing active")
srv := &http.Server{}
err = srv.Serve(revdial.NewListener(bufio.NewReadWriter(
bufio.NewReader(conn),
bufio.NewWriter(conn),
)))
log.Printf("Reverse buildlet Serve complete; err=%v", err)
return err
}

func (rl *reverseListener) Accept() (net.Conn, error) {
if !rl.done {
// First call to Accept, return our one net.Conn.
rl.done = true
return rl.conn, nil
}
// Second call to Accept, block until we decide the entire
// server should be torn down.
defer rl.conn.Close()
const timeout = 1 * time.Minute
timer := time.NewTimer(timeout)
var state http.ConnState
for {
select {
case state = <-rl.stateCh:
if state == http.StateClosed {
return nil, errors.New("coordinator connection closed")
}
// The coordinator sends a health check every 30 seconds
// when buildlets are idle. If we go a minute without
// seeing anything, assume the coordinator is in a bad way
// (probably restarted) and close the connection.
case <-timer.C:
if state == http.StateIdle {
return nil, errors.New("coordinator connection unhealthy")
}
}
timer.Reset(timeout)
}
}

func (rl *reverseListener) Close() error { return nil }
func (rl *reverseListener) Addr() net.Addr { return reverseAddr("buildlet") }

// reverseAddr implements net.Addr for reverseListener.
type reverseAddr string

func (a reverseAddr) Network() string { return "reverse" }
func (a reverseAddr) String() string { return "reverse:" + string(a) }

func devBuilderKey(builder string) string {
h := hmac.New(md5.New, []byte("gophers rule"))
io.WriteString(h, builder)
Expand All @@ -188,7 +124,14 @@ func homedir() string {
if runtime.GOOS == "windows" {
return os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
}
return os.Getenv("HOME")
home := os.Getenv("HOME")
if home != "" {
return home
}
if os.Getuid() == 0 {
return "/root"
}
return "/"
}

// TestDialCoordinator dials the coordinator. Exported for testing.
Expand Down
6 changes: 5 additions & 1 deletion cmd/buildlet/stage0/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
buildlet-stage0.windows-amd64: stage0.go
GOOS=windows GOARCH=amd64 go build -o $@
cat $@ | (cd ../../upload && go run upload.go --public go-builder-data/$@)
cat $@ | (cd ../../upload && go run upload.go --public --cacheable=false go-builder-data/$@)

buildlet-stage0.linux-arm-scaleway: stage0.go
GOOS=linux GOARCH=arm go build -o $@
cat $@ | (cd ../../upload && go run upload.go --public --cacheable=false go-builder-data/$@)
Loading

0 comments on commit 1f0d8f2

Please sign in to comment.