From 1f0d8f287ced6ce993e61a71e7b3c57c3d72bc0c Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 14 Sep 2015 18:33:55 +0000 Subject: [PATCH] all: tons of builder work * reverse buildlet rework (multiplexed TCP connections, instead of a hacky reverse roundtripper) * scaleway ARM image improvements * parallel gzip implementation, which makes things ~8x faster on Scaleway. * merge watcher into the coordinator, for easier deployments Change-Id: I55d769f982e6583b261435309faa1f718a15fde1 Reviewed-on: https://go-review.googlesource.com/12665 Reviewed-by: Brad Fitzpatrick --- buildlet/buildletclient.go | 21 +- cmd/buildlet/Makefile | 8 + cmd/buildlet/buildlet.go | 50 +- cmd/buildlet/reverse.go | 107 +--- cmd/buildlet/stage0/Makefile | 6 +- cmd/buildlet/stage0/stage0.go | 185 ++++++- cmd/coordinator/Makefile | 4 +- cmd/coordinator/buildongce/create.go | 25 +- cmd/coordinator/coordinator.go | 70 ++- cmd/coordinator/reverse.go | 275 +++++----- cmd/coordinator/watcher.go | 73 ++- .../watcher_process.go} | 48 +- cmd/scaleway/scaleway.go | 63 ++- dashboard/builders.go | 83 +-- env/commit-watcher/Dockerfile | 17 - env/commit-watcher/Makefile | 11 - .../scripts/build-commit-watcher.sh | 34 -- env/linux-arm/scaleway/Dockerfile | 46 -- env/linux-arm/scaleway/buildlet.service | 16 + env/linux-arm/scaleway/run-buildlet.sh | 20 - env/watcher-world/Dockerfile | 16 + env/watcher-world/Makefile | 11 + .../scripts/install-apt-deps.sh | 4 - pargzip/pargzip.go | 209 +++++++ pargzip/pargzip_test.go | 52 ++ revdial/revdial.go | 518 ++++++++++++++++++ revdial/revdial_test.go | 189 +++++++ 27 files changed, 1618 insertions(+), 543 deletions(-) rename cmd/{watcher/watcher.go => coordinator/watcher_process.go} (93%) delete mode 100644 env/commit-watcher/Dockerfile delete mode 100644 env/commit-watcher/Makefile delete mode 100755 env/commit-watcher/scripts/build-commit-watcher.sh delete mode 100644 env/linux-arm/scaleway/Dockerfile create mode 100644 env/linux-arm/scaleway/buildlet.service delete mode 100644 env/linux-arm/scaleway/run-buildlet.sh create mode 100644 env/watcher-world/Dockerfile create mode 100644 env/watcher-world/Makefile rename env/{commit-watcher => watcher-world}/scripts/install-apt-deps.sh (57%) create mode 100644 pargzip/pargzip.go create mode 100644 pargzip/pargzip_test.go create mode 100644 revdial/revdial.go create mode 100644 revdial/revdial_test.go diff --git a/buildlet/buildletclient.go b/buildlet/buildletclient.go index c251e16d3c..c980ca2a02 100644 --- a/buildlet/buildletclient.go +++ b/buildlet/buildletclient.go @@ -54,8 +54,10 @@ func (c *Client) SetCloseFunc(fn func() error) { c.closeFunc = fn } +var ErrClosed = errors.New("buildlet: Client closed") + func (c *Client) Close() error { - c.setPeerDead(errors.New("Close called")) + c.setPeerDead(ErrClosed) // TODO(bradfitz): split concept of closed vs. broken? var err error if c.closeFunc != nil { err = c.closeFunc() @@ -67,6 +69,10 @@ func (c *Client) Close() error { // To be called only via c.setPeerDeadOnce.Do(s.setPeerDead) func (c *Client) setPeerDead(err error) { c.setPeerDeadOnce.Do(func() { + c.MarkBroken() + if err == nil { + err = errors.New("peer dead (no specific error)") + } c.deadErr = err close(c.peerDead) }) @@ -156,6 +162,13 @@ func (c *Client) URL() string { func (c *Client) IPPort() string { return c.ipPort } +func (c *Client) Name() string { + if c.ipPort != "" { + return c.ipPort + } + return "(unnamed-buildlet)" +} + // MarkBroken marks this client as broken in some way. func (c *Client) MarkBroken() { c.mu.Lock() @@ -571,6 +584,12 @@ type Status struct { // Status returns an Status value describing this buildlet. func (c *Client) Status() (Status, error) { + select { + case <-c.peerDead: + return Status{}, c.deadErr + default: + // Continue below. + } req, err := http.NewRequest("GET", c.URL()+"/status", nil) if err != nil { return Status{}, err diff --git a/cmd/buildlet/Makefile b/cmd/buildlet/Makefile index 1c425e5aaf..4090f00f9a 100644 --- a/cmd/buildlet/Makefile +++ b/cmd/buildlet/Makefile @@ -33,6 +33,14 @@ buildlet.windows-amd64: buildlet.go reverse.go buildlet_windows.go GOOS=windows GOARCH=amd64 go build -o $@ cat $@ | (cd ../upload && go run upload.go --public go-builder-data/$@) +buildlet.linux-arm: buildlet.go reverse.go + GOOS=linux GOARCH=arm go build -o $@ + cat $@ | (cd ../upload && go run upload.go --public go-builder-data/$@) + +dev-buildlet.linux-arm: buildlet.go reverse.go + GOOS=linux GOARCH=arm go build -o $@ + cat $@ | (cd ../upload && go run upload.go --public dev-go-builder-data/buildlet.linux-arm) + dev-buildlet.linux-amd64: buildlet.go reverse.go GOOS=linux GOARCH=amd64 go build -o $@ cat $@ | (cd ../upload && go run upload.go --public dev-go-builder-data/buildlet.linux-amd64) diff --git a/cmd/buildlet/buildlet.go b/cmd/buildlet/buildlet.go index 2488914e22..5052e3269f 100644 --- a/cmd/buildlet/buildlet.go +++ b/cmd/buildlet/buildlet.go @@ -39,6 +39,7 @@ import ( "golang.org/x/build/buildlet" "golang.org/x/build/envutil" + "golang.org/x/build/pargzip" "google.golang.org/cloud/compute/metadata" ) @@ -48,8 +49,18 @@ var ( listenAddr = flag.String("listen", "AUTO", "address to listen on. Unused in reverse mode. Warning: this service is inherently insecure and offers no protection of its own. Do not expose this port to the world.") reverse = flag.String("reverse", "", "if non-empty, go into reverse mode where the buildlet dials the coordinator instead of listening for connections. The value is a comma-separated list of modes, e.g. 'darwin-arm,darwin-amd64-race'") coordinator = flag.String("coordinator", "localhost:8119", "address of coordinator, in production use farmer.golang.org. Only used in reverse mode.") + hostname = flag.String("hostname", "", "hostname to advertise to coordinator for reverse mode; default is actual hostname") ) +// Bump this whenever something notable happens, or when another +// component needs a certain feature. This shows on the coordinator +// per reverse client, and is also accessible via the buildlet +// package's client API (via the Status method). +// +// Notable versions: +// 3: switched to revdial protocol +const buildletVersion = 3 + func defaultListenAddr() string { if runtime.GOOS == "darwin" { // Darwin will never run on GCE, so let's always @@ -74,7 +85,8 @@ func main() { if runtime.GOOS == "plan9" { log.SetOutput(&plan9LogWriter{w: os.Stderr}) } - if runtime.GOOS == "linux" && !inKube { + onGCE := metadata.OnGCE() + if runtime.GOOS == "linux" && onGCE && !inKube { if w, err := os.OpenFile("/dev/console", os.O_WRONLY, 0); err == nil { log.SetOutput(w) } @@ -88,7 +100,6 @@ func main() { *listenAddr = v } - onGCE := metadata.OnGCE() if !onGCE && !strings.HasPrefix(*listenAddr, "localhost:") { log.Printf("** WARNING *** This server is unsafe and offers no security. Be careful.") } @@ -128,7 +139,10 @@ func main() { http.HandleFunc("/debug/goroutines", handleGoroutines) http.HandleFunc("/debug/x", handleX) - password := metadataValue("password") + var password string + if *reverse == "" { + password = metadataValue("password") + } requireAuth := func(handler func(w http.ResponseWriter, r *http.Request)) http.Handler { return requirePasswordHandler{http.HandlerFunc(handler), password} } @@ -145,7 +159,7 @@ func main() { if *reverse == "" { listenForCoordinator() } else { - repeatDialCoordinator() + dialCoordinator() } } @@ -351,7 +365,7 @@ func handleGetTGZ(w http.ResponseWriter, r *http.Request) { http.Error(w, "bogus dir", http.StatusBadRequest) return } - zw := gzip.NewWriter(w) + zw := pargzip.NewWriter(w) tw := tar.NewWriter(zw) base := filepath.Join(*workDir, filepath.FromSlash(dir)) err := filepath.Walk(base, func(path string, fi os.FileInfo, err error) error { @@ -442,11 +456,13 @@ func handleWriteTGZ(w http.ResponseWriter, r *http.Request) { } res, err := http.Get(urlStr) if err != nil { + log.Printf("Failed to fetch tgz URL %s: %v", urlStr, err) http.Error(w, fmt.Sprintf("fetching URL %s: %v", urlStr, err), http.StatusInternalServerError) return } defer res.Body.Close() if res.StatusCode != http.StatusOK { + log.Printf("Failed to fetch tgz URL %s: status=%v", urlStr, res.Status) http.Error(w, fmt.Sprintf("fetching provided url: %s", res.Status), http.StatusInternalServerError) return } @@ -458,6 +474,7 @@ func handleWriteTGZ(w http.ResponseWriter, r *http.Request) { err := untar(tgz, baseDir) if err != nil { + log.Printf("untar failure: %v", err) status := http.StatusInternalServerError if he, ok := err.(httpStatuser); ok { status = he.httpStatus() @@ -906,8 +923,25 @@ func handleRemoveAll(w http.ResponseWriter, r *http.Request) { } for _, p := range paths { log.Printf("Removing %s", p) - p = filepath.Join(*workDir, filepath.FromSlash(p)) - if err := os.RemoveAll(p); err != nil { + fullDir := filepath.Join(*workDir, filepath.FromSlash(p)) + err := os.RemoveAll(fullDir) + if p == "." && err != nil { + // If workDir is a mountpoint and/or contains a binary + // using it, we can get a "Device or resource busy" error. + // See if it's now empty and ignore the error. + if f, oerr := os.Open(*workDir); oerr == nil { + if all, derr := f.Readdirnames(-1); derr == nil && len(all) == 0 { + log.Printf("Ignoring fail of RemoveAll(.)") + err = nil + } else { + log.Printf("Readdir = %q, %f", all, derr) + } + f.Close() + } else { + log.Printf("Failed to open workdir: %v", oerr) + } + } + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -933,7 +967,7 @@ func handleStatus(w http.ResponseWriter, r *http.Request) { return } status := buildlet.Status{ - Version: 1, + Version: buildletVersion, } b, err := json.Marshal(status) if err != nil { diff --git a/cmd/buildlet/reverse.go b/cmd/buildlet/reverse.go index d70a12167c..da800f8868 100644 --- a/cmd/buildlet/reverse.go +++ b/cmd/buildlet/reverse.go @@ -10,7 +10,6 @@ import ( "crypto/md5" "crypto/tls" "crypto/x509" - "errors" "fmt" "io" "io/ioutil" @@ -20,25 +19,20 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" - "time" "golang.org/x/build" + "golang.org/x/build/revdial" ) -func repeatDialCoordinator() { - for { - if err := dialCoordinator(); err != nil { - log.Print(err) - } - log.Printf("Waiting 30 seconds and dialing again.") - time.Sleep(30 * time.Second) - } -} - func dialCoordinator() error { devMode := !strings.HasPrefix(*coordinator, "farmer.golang.org") + if *hostname == "" { + *hostname, _ = os.Hostname() + } + modes := strings.Split(*reverse, ",") var keys []string for _, m := range modes { @@ -96,6 +90,8 @@ func dialCoordinator() error { } req.Header["X-Go-Builder-Type"] = modes req.Header["X-Go-Builder-Key"] = keys + req.Header.Set("X-Go-Builder-Hostname", *hostname) + req.Header.Set("X-Go-Builder-Version", strconv.Itoa(buildletVersion)) if err := req.Write(conn); err != nil { return fmt.Errorf("coordinator /reverse request failed: %v", err) } @@ -103,81 +99,21 @@ func dialCoordinator() error { if err != nil { return fmt.Errorf("coordinator /reverse response failed: %v", err) } - if resp.StatusCode != 200 { + if resp.StatusCode != 101 { msg, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf("coordinator registration failed:\n\t%s", msg) } - resp.Body.Close() - // The client becomes the simple http server. - log.Printf("Connected to coordinator, serving HTTP back at them.") - stateCh := make(chan http.ConnState, 1) - srv := &http.Server{ - ConnState: func(_ net.Conn, state http.ConnState) { stateCh <- state }, - } - return srv.Serve(&reverseListener{ - conn: conn, - stateCh: stateCh, - }) -} - -// reverseListener serves out a single underlying conn, once. -// -// It is designed to be passed to a *http.Server, which loops -// continually calling Accept. As this reverse connection only -// ever has one connection to hand out, it responds to the first -// Accept, and then blocks on the second Accept. -// -// While blocking on the second Accept, this listener takes on the -// job of checking the health of the original net.Conn it handed out. -// If it goes unused for a while, it closes the original net.Conn -// and returns an error, ending the life of the *http.Server -type reverseListener struct { - done bool - conn net.Conn - stateCh <-chan http.ConnState + log.Printf("Connected to coordinator; reverse dialing active") + srv := &http.Server{} + err = srv.Serve(revdial.NewListener(bufio.NewReadWriter( + bufio.NewReader(conn), + bufio.NewWriter(conn), + ))) + log.Printf("Reverse buildlet Serve complete; err=%v", err) + return err } -func (rl *reverseListener) Accept() (net.Conn, error) { - if !rl.done { - // First call to Accept, return our one net.Conn. - rl.done = true - return rl.conn, nil - } - // Second call to Accept, block until we decide the entire - // server should be torn down. - defer rl.conn.Close() - const timeout = 1 * time.Minute - timer := time.NewTimer(timeout) - var state http.ConnState - for { - select { - case state = <-rl.stateCh: - if state == http.StateClosed { - return nil, errors.New("coordinator connection closed") - } - // The coordinator sends a health check every 30 seconds - // when buildlets are idle. If we go a minute without - // seeing anything, assume the coordinator is in a bad way - // (probably restarted) and close the connection. - case <-timer.C: - if state == http.StateIdle { - return nil, errors.New("coordinator connection unhealthy") - } - } - timer.Reset(timeout) - } -} - -func (rl *reverseListener) Close() error { return nil } -func (rl *reverseListener) Addr() net.Addr { return reverseAddr("buildlet") } - -// reverseAddr implements net.Addr for reverseListener. -type reverseAddr string - -func (a reverseAddr) Network() string { return "reverse" } -func (a reverseAddr) String() string { return "reverse:" + string(a) } - func devBuilderKey(builder string) string { h := hmac.New(md5.New, []byte("gophers rule")) io.WriteString(h, builder) @@ -188,7 +124,14 @@ func homedir() string { if runtime.GOOS == "windows" { return os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH") } - return os.Getenv("HOME") + home := os.Getenv("HOME") + if home != "" { + return home + } + if os.Getuid() == 0 { + return "/root" + } + return "/" } // TestDialCoordinator dials the coordinator. Exported for testing. diff --git a/cmd/buildlet/stage0/Makefile b/cmd/buildlet/stage0/Makefile index 81d78b0287..6c2127e070 100644 --- a/cmd/buildlet/stage0/Makefile +++ b/cmd/buildlet/stage0/Makefile @@ -1,3 +1,7 @@ buildlet-stage0.windows-amd64: stage0.go GOOS=windows GOARCH=amd64 go build -o $@ - cat $@ | (cd ../../upload && go run upload.go --public go-builder-data/$@) + cat $@ | (cd ../../upload && go run upload.go --public --cacheable=false go-builder-data/$@) + +buildlet-stage0.linux-arm-scaleway: stage0.go + GOOS=linux GOARCH=arm go build -o $@ + cat $@ | (cd ../../upload && go run upload.go --public --cacheable=false go-builder-data/$@) diff --git a/cmd/buildlet/stage0/stage0.go b/cmd/buildlet/stage0/stage0.go index 7144b22a7f..c2aea2c63f 100644 --- a/cmd/buildlet/stage0/stage0.go +++ b/cmd/buildlet/stage0/stage0.go @@ -2,17 +2,20 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// The stage0 command looks up the buildlet's URL from the GCE -// metadata service, downloads it, and runs it. If not on GCE, such as -// when in a Linux Docker container being developed and tested -// locally, the stage0 instead looks for the META_BUILDLET_BINARY_URL -// environment to have a URL to the buildlet binary. +// The stage0 command looks up the buildlet's URL from its environment +// (GCE metadata service, scaleway, etc), downloads it, and runs +// it. If not on GCE, such as when in a Linux Docker container being +// developed and tested locally, the stage0 instead looks for the +// META_BUILDLET_BINARY_URL environment to have a URL to the buildlet +// binary. package main import ( + "encoding/json" "flag" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -20,6 +23,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "time" "google.golang.org/cloud/compute/metadata" @@ -32,9 +36,20 @@ var networkWait = flag.Duration("network-wait", 0, "if non-zero, the time to wai const attr = "buildlet-binary-url" +var ( + onScaleway bool + scalewayMeta scalewayMetadata +) + func main() { flag.Parse() + if runtime.GOOS == "linux" && runtime.GOARCH == "arm" { + if _, err := os.Stat("/usr/local/bin/oc-metadata"); err == nil { + initScaleway() + } + } + if !awaitNetwork() { sleepFatalf("network didn't become reachable") } @@ -54,11 +69,46 @@ func main() { cmd := exec.Command(target) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + if onScaleway { + cmd.Args = append(cmd.Args, scalewayBuildletArgs()...) + } if err := cmd.Run(); err != nil { sleepFatalf("Error running buildlet: %v", err) } } +func scalewayBuildletArgs() []string { + var modes []string // e.g. "linux-arm", "linux-arm-arm5" + // tags are of form "buildkey_linux-arm_HEXHEXHEX" + for _, tag := range scalewayMeta.Tags { + if strings.HasPrefix(tag, "buildkey_") { + parts := strings.Split(tag, "_") + if len(parts) != 3 { + log.Fatalf("invalid server tag %q", tag) + } + mode, buildkey := parts[1], parts[2] + modes = append(modes, mode) + file := "/root/.gobuildkey-" + mode + if fi, err := os.Stat(file); err != nil || (err == nil && fi.Size() == 0) { + if err := ioutil.WriteFile(file, []byte(buildkey), 0600); err != nil { + log.Fatal(err) + } + } + } + } + server := "farmer.golang.org:443" + if scalewayMeta.IsStaging() { + server = "104.154.113.235:443" // fixed IP, but no hostname. + } + return []string{ + "--workdir=/workdir", + "--hostname=" + scalewayMeta.Hostname, + "--halt=false", + "--reverse=" + strings.Join(modes, ","), + "--coordinator=" + server, + } +} + // awaitNetwork reports whether the network came up within 30 seconds, // determined somewhat arbitrarily via a DNS lookup for google.com. func awaitNetwork() bool { @@ -78,6 +128,13 @@ func buildletURL() string { if v := os.Getenv("META_BUILDLET_BINARY_URL"); v != "" { return v } + if onScaleway { + if scalewayMeta.IsStaging() { + return "https://storage.googleapis.com/dev-go-builder-data/buildlet.linux-arm" + } else { + return "https://storage.googleapis.com/go-builder-data/buildlet.linux-arm" + } + } sleepFatalf("Not on GCE, and no META_BUILDLET_BINARY_URL specified.") } v, err := metadata.InstanceAttributeValue(attr) @@ -97,6 +154,9 @@ func sleepFatalf(format string, args ...interface{}) { } func download(file, url string) error { + if strings.HasPrefix(url, "https://storage.googleapis.com") { + url += fmt.Sprintf("?%d", time.Now().Unix()) + } log.Printf("Downloading %s to %s ...\n", url, file) var res *http.Response @@ -136,3 +196,118 @@ func download(file, url string) error { log.Printf("Downloaded %s (%d bytes)", file, n) return nil } + +func initScaleway() { + log.Printf("On scaleway.") + onScaleway = true + initScalewaySwap() + initScalewayWorkdir() + initScalewayMeta() + initScalewayGo14() + log.Printf("Scaleway init complete; metadata is %+v", scalewayMeta) +} + +type scalewayMetadata struct { + Name string `json:"name"` + Hostname string `json:"hostname"` + Tags []string `json:"tags"` +} + +// IsStaging reports whether this instance has a "staging" tag. +func (m *scalewayMetadata) IsStaging() bool { + for _, t := range m.Tags { + if t == "staging" { + return true + } + } + return false +} + +func initScalewayMeta() { + const metaURL = "http://169.254.42.42/conf?format=json" + res, err := http.Get(metaURL) + if err != nil { + log.Fatalf("failed to get scaleway metadata: %v", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Fatalf("failed to get scaleway metadata from %s: %v", metaURL, res.Status) + } + if err := json.NewDecoder(res.Body).Decode(&scalewayMeta); err != nil { + log.Fatalf("invalid JSON from scaleway metadata URL %s: %v", metaURL, err) + } +} + +func initScalewaySwap() { + const swapFile = "/swapfile" + slurp, _ := ioutil.ReadFile("/proc/swaps") + if strings.Contains(string(slurp), swapFile) { + log.Printf("scaleway swapfile already active.") + return + } + os.Remove(swapFile) // if it already exists, else ignore error + log.Printf("Running fallocate on swapfile") + if out, err := exec.Command("fallocate", "--length", "16GiB", swapFile).CombinedOutput(); err != nil { + log.Fatalf("Failed to fallocate /swapfile: %v, %s", err, out) + } + log.Printf("Running mkswap") + if out, err := exec.Command("mkswap", swapFile).CombinedOutput(); err != nil { + log.Fatalf("Failed to mkswap /swapfile: %v, %s", err, out) + } + os.Chmod(swapFile, 0600) + log.Printf("Running swapon") + if out, err := exec.Command("swapon", swapFile).CombinedOutput(); err != nil { + log.Fatalf("Failed to swapon /swapfile: %v, %s", err, out) + } +} + +func initScalewayWorkdir() { + const dir = "/workdir" + slurp, _ := ioutil.ReadFile("/proc/mounts") + if strings.Contains(string(slurp), dir) { + log.Printf("scaleway workdir already mounted") + return + } + if err := os.MkdirAll("/workdir", 0755); err != nil { + log.Fatal(err) + } + if out, err := exec.Command("mount", + "-t", "tmpfs", + "-o", "size=8589934592", + "tmpfs", "/workdir").CombinedOutput(); err != nil { + log.Fatalf("Failed to mount /buildtmp: %v, %s", err, out) + } +} + +func initScalewayGo14() { + if fi, err := os.Stat("/usr/local/go"); err == nil && fi.IsDir() { + log.Printf("go directory already exists.") + return + } + os.RemoveAll("/usr/local/go") // in case it existed somehow, or as regular file + if err := os.RemoveAll("/usr/local/go.tmp"); err != nil { + log.Fatal(err) + } + if err := os.MkdirAll("/usr/local/go.tmp", 0755); err != nil { + log.Fatal(err) + } + log.Printf("Downloading go1.4-linux-arm.tar.gz") + if out, err := exec.Command("curl", + "-o", "/usr/local/go.tmp/go.tar.gz", + "--silent", + "https://storage.googleapis.com/go-builder-data/go1.4-linux-arm.tar.gz", + ).CombinedOutput(); err != nil { + log.Fatalf("Failed to download go1.4-linux-arm.tar.gz: %v, %s", err, out) + } + log.Printf("Extracting go1.4-linux-arm.tar.gz") + if out, err := exec.Command("tar", + "-C", "/usr/local/go.tmp", + "-zx", + "-f", "/usr/local/go.tmp/go.tar.gz", + ).CombinedOutput(); err != nil { + log.Fatalf("Failed to untar go1.4-linux-arm.tar.gz: %v, %s", err, out) + } + if err := os.Rename("/usr/local/go.tmp", "/usr/local/go"); err != nil { + log.Fatal(err) + } +} diff --git a/cmd/coordinator/Makefile b/cmd/coordinator/Makefile index b5e39721e0..5ba370fe53 100644 --- a/cmd/coordinator/Makefile +++ b/cmd/coordinator/Makefile @@ -1,5 +1,5 @@ -coordinator: builders.go coordinator.go dash.go debug.go gce.go remote.go reverse.go status.go watcher.go - GOOS=linux go build --ldflags="-X main.Version $$USER-$$(TZ=UTC date +%FT%T)Z" -o coordinator . +coordinator: builders.go coordinator.go dash.go debug.go gce.go remote.go reverse.go status.go watcher.go watcher_process.go + GOOS=linux go build --ldflags="-X main.Version=$$USER-$$(TZ=UTC date +%FT%T)Z" -o coordinator . # After "make upload", either reboot the machine, or ssh to it and: # sudo systemctl restart gobuild.service diff --git a/cmd/coordinator/buildongce/create.go b/cmd/coordinator/buildongce/create.go index 6c6a7b7e18..730cd8d2cb 100644 --- a/cmd/coordinator/buildongce/create.go +++ b/cmd/coordinator/buildongce/create.go @@ -18,24 +18,25 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" compute "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" ) var ( proj = flag.String("project", "symbolic-datum-552", "name of Project") - zone = flag.String("zone", "us-central1-b", "GCE zone") + zone = flag.String("zone", "us-central1-f", "GCE zone") mach = flag.String("machinetype", "n1-highcpu-2", "Machine type") instName = flag.String("instance_name", "farmer", "Name of VM instance.") sshPub = flag.String("ssh_public_key", "", "ssh public key file to authorize. Can modify later in Google's web UI anyway.") staticIP = flag.String("static_ip", "", "Static IP to use. If empty, automatic.") reuseDisk = flag.Bool("reuse_disk", true, "Whether disk images should be reused between shutdowns/restarts.") - ssd = flag.Bool("ssd", false, "use a solid state disk (faster, more expensive)") + ssd = flag.Bool("ssd", true, "use a solid state disk (faster, more expensive)") coordinator = flag.String("coord", "https://storage.googleapis.com/go-builder-data/coordinator", "Coordinator binary URL") - dev = flag.Bool("dev", false, "change default -project and -coordinator flags to their default dev cluster values, as well as use 'dev-' prefixed OAuth token files.") + staging = flag.Bool("staging", false, "change default -project and -coordinator flags to their default dev cluster values, as well as use 'staging-' prefixed OAuth token files.") ) -func devPrefix() string { - if *dev { - return "dev-" +func stagingPrefix() string { + if *staging { + return "staging-" } return "" } @@ -81,8 +82,8 @@ func main() { oauthConfig = &oauth2.Config{ // The client-id and secret should be for an "Installed Application" when using // the CLI. Later we'll use a web application with a callback. - ClientID: readFile(devPrefix() + "client-id.dat"), - ClientSecret: readFile(devPrefix() + "client-secret.dat"), + ClientID: readFile(stagingPrefix() + "client-id.dat"), + ClientSecret: readFile(stagingPrefix() + "client-secret.dat"), Endpoint: google.Endpoint, Scopes: []string{ compute.DevstorageFullControlScope, @@ -93,7 +94,7 @@ func main() { RedirectURL: "urn:ietf:wg:oauth:2.0:oob", } - if *dev { + if *staging { if *proj == "symbolic-datum-552" { *proj = "go-dashboard-dev" } @@ -118,7 +119,7 @@ func main() { prefix := "https://www.googleapis.com/compute/v1/projects/" + *proj machType := prefix + "/zones/" + *zone + "/machineTypes/" + *mach - tokenFileName := devPrefix() + "token.dat" + tokenFileName := stagingPrefix() + "token.dat" tokenFile := tokenCacheFile(tokenFileName) tokenSource := oauth2.ReuseTokenSource(nil, tokenFile) token, err := tokenSource.Token() @@ -187,7 +188,7 @@ func main() { Items: []*compute.MetadataItems{ { Key: "user-data", - Value: cloudConfig, + Value: googleapi.String(cloudConfig), }, }, }, @@ -255,7 +256,7 @@ OpLoop: } func instanceDisk(svc *compute.Service) *compute.AttachedDisk { - const imageURL = "https://www.googleapis.com/compute/v1/projects/coreos-cloud/global/images/coreos-alpha-402-2-0-v20140807" + const imageURL = "https://www.googleapis.com/compute/v1/projects/coreos-cloud/global/images/coreos-stable-723-3-0-v20150804" diskName := *instName + "-coreos-stateless-pd" if *reuseDisk { diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go index 50b0d11835..79419a54fe 100644 --- a/cmd/coordinator/coordinator.go +++ b/cmd/coordinator/coordinator.go @@ -52,13 +52,9 @@ var Version string // set by linker -X // finishes before destroying buildlets. const devPause = false -func init() { - // Disabled until we have test sharding. This takes 85+ minutes. - // Test sharding is https://github.com/golang/go/issues/10029 - delete(dashboard.Builders, "linux-arm-qemu") -} - var ( + role = flag.String("role", "coordinator", "Which role this binary should run as. Valid options: coordinator, watcher") + masterKeyFile = flag.String("masterkey", "", "Path to builder master key. Else fetched using GCE project attribute 'builder-master-key'.") // TODO(bradfitz): remove this list and just query it from the compute API: @@ -117,13 +113,7 @@ func init() { "plan9-386", "nacl-386", "nacl-amd64p32", - /* "linux-arm-shard_test", - "linux-arm-shard_std_am", - "linux-arm-shard_std_nz", - "linux-arm-shard_runtimecpu", - "linux-arm-shard_cgotest", - "linux-arm-shard_misc", - */ + // "linux-arm", } for _, bname := range tryList { conf, ok := dashboard.Builders[bname] @@ -241,6 +231,16 @@ func (fn eventTimeLoggerFunc) logEventTime(event string, optText ...string) { func main() { flag.Parse() + switch *role { + default: + log.Fatalf("unsupported role %q", *role) + case "watcher": + watcherMain() + panic("watcherMain finished") + case "coordinator": + // fall through + } + log.Printf("coordinator version %q starting", Version) err := initGCE() if err != nil { @@ -262,6 +262,7 @@ func main() { http.HandleFunc("/", handleStatus) http.HandleFunc("/debug/goroutines", handleDebugGoroutines) + http.HandleFunc("/debug/watcher", handleDebugWatcher) http.HandleFunc("/builders", handleBuilders) http.HandleFunc("/temporarylogs", handleLogs) http.HandleFunc("/reverse", handleReverse) @@ -309,6 +310,9 @@ func main() { select { case work := <-workc: if !mayBuildRev(work) { + if inStaging { + log.Printf("may not build %v; skipping", work) + } continue } st, err := newBuild(work) @@ -328,6 +332,7 @@ func main() { func stagingClusterBuilders() map[string]dashboard.BuildConfig { m := map[string]dashboard.BuildConfig{} for _, name := range []string{ + "linux-arm", "linux-amd64", "linux-amd64-race", "windows-amd64-gce", @@ -566,10 +571,17 @@ func workaroundFlush(w http.ResponseWriter) { // TODO(bradfitz): it also currently does not support subrepos. func findWorkLoop(work chan<- builderRev) { // Useful for debugging a single run: - if inStaging && false { - work <- builderRev{name: "linux-amd64", rev: "c9778ec302b2e0e0d6027e1e0fca892e428d9657", subName: "tools", subRev: "ac303766f5f240c1796eeea3dc9bf34f1261aa35"} + if inStaging && true { + //work <- builderRev{name: "linux-arm", rev: "c9778ec302b2e0e0d6027e1e0fca892e428d9657", subName: "tools", subRev: "ac303766f5f240c1796eeea3dc9bf34f1261aa35"} //work <- builderRev{name: "linux-amd64", rev: "54789eff385780c54254f822e09505b6222918e2"} //work <- builderRev{name: "windows-amd64-gce", rev: "54789eff385780c54254f822e09505b6222918e2"} + log.Printf("Test work awaiting arm") + for !reversePool.CanBuild("linux-arm") { + time.Sleep(time.Second) + } + log.Printf("Sending test work.") + work <- builderRev{name: "linux-arm", rev: "c1aee8c825c2179ad4959ebf533bf27c4b774d00"} + log.Printf("Sent test work.") // Still run findWork but ignore what it does. ignore := make(chan builderRev) @@ -1049,11 +1061,11 @@ func GetBuildlets(cancel Cancel, pool BuildletPool, n int, machineType, rev stri } return } - el.logEventTime("helper_ready") + el.logEventTime("empty_helper_ready", bc.Name()) select { case ch <- bc: case <-cancel: - el.logEventTime("helper_killed_before_use") + el.logEventTime("helper_killed_before_use", bc.Name()) bc.Close() return } @@ -1212,7 +1224,7 @@ func (st *buildStatus) useSnapshot() bool { if st.useSnapshotMemo != nil { return *st.useSnapshotMemo } - b := st.isSubrepo() && st.conf.SplitMakeRun() && st.snapshotExists() + b := st.conf.SplitMakeRun() && st.snapshotExists() st.useSnapshotMemo = &b return b } @@ -1944,33 +1956,34 @@ func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err go func() { for helper := range helpers { go func(bc *buildlet.Client) { - defer st.logEventTime("closed_helper", bc.IPPort()) + defer st.logEventTime("closed_helper", bc.Name()) defer bc.Close() defer nukeIfBroken(bc) if devPause { defer time.Sleep(5 * time.Minute) - defer st.logEventTime("DEV_HELPER_SLEEP", bc.IPPort()) + defer st.logEventTime("DEV_HELPER_SLEEP", bc.Name()) } - st.logEventTime("got_helper", bc.String()) + st.logEventTime("got_empty_test_helper", bc.String()) if err := bc.PutTarFromURL(st.snapshotURL(), "go"); err != nil { - log.Printf("failed to extract snapshot for helper %s: %v", bc.IPPort(), err) + log.Printf("failed to extract snapshot for helper %s: %v", bc.Name(), err) return } workDir, err := bc.WorkDir() if err != nil { - log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err) + log.Printf("error discovering workdir for helper %s: %v", bc.Name(), err) return } - st.logEventTime("setup_helper", bc.String()) + st.logEventTime("test_helper_set_up", bc.Name()) goroot := st.conf.FilePathJoin(workDir, "go") for !bc.IsBroken() { tis, ok := set.testsToRunBiggestFirst() if !ok { - st.logEventTime("biggest_tests_complete", bc.IPPort()) + st.logEventTime("no_new_tests_remain", bc.Name()) return } st.runTestsOnBuildlet(bc, tis, goroot) } + st.logEventTime("test_helper_is_broken", bc.Name()) }(helper) } }() @@ -2061,7 +2074,7 @@ func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem, panic("only go_test:* tests may be merged") } } - which := fmt.Sprintf("%s: %v", bc.IPPort(), names) + which := fmt.Sprintf("%s: %v", bc.Name(), names) st.logEventTime("start_tests", which) args := []string{"tool", "dist", "test", "--no-rebuild", "--banner=" + banner} @@ -2297,7 +2310,7 @@ type buildStatus struct { startedPinging bool // started pinging the go dashboard events []eventAndTime watcher []*logWatcher - useSnapshotMemo *bool + useSnapshotMemo *bool // if non-nil, memoized result of useSnapshot } func (st *buildStatus) setDone(succeeded bool) { @@ -2322,6 +2335,9 @@ func (st *buildStatus) logf(format string, args ...interface{}) { } func (st *buildStatus) logEventTime(event string, optText ...string) { + if inStaging { + st.logf("%s %v", event, optText) + } st.mu.Lock() defer st.mu.Unlock() switch event { diff --git a/cmd/coordinator/reverse.go b/cmd/coordinator/reverse.go index 989910c98c..96e795373f 100644 --- a/cmd/coordinator/reverse.go +++ b/cmd/coordinator/reverse.go @@ -28,12 +28,12 @@ work, go to: */ import ( - "bufio" "bytes" "errors" "fmt" "io" "log" + "math/rand" "net" "net/http" "sort" @@ -42,6 +42,7 @@ import ( "time" "golang.org/x/build/buildlet" + "golang.org/x/build/revdial" ) const minBuildletVersion = 1 @@ -50,15 +51,6 @@ var reversePool = &reverseBuildletPool{ buildletReturned: make(chan token, 1), } -func init() { - go func() { - for { - time.Sleep(15 * time.Second) - reversePool.reverseHealthCheck() - } - }() -} - type token struct{} type reverseBuildletPool struct { @@ -88,11 +80,7 @@ func (p *reverseBuildletPool) tryToGrab(machineType string) (*buildlet.Client, e b.inUseAs = machineType b.inUseTime = time.Now() b.client.SetCloseFunc(func() error { - p.mu.Lock() - b.inUseAs = "" - b.inUseTime = time.Now() - p.mu.Unlock() - p.noteBuildletReturned() + p.nukeBuildlet(b.client) return nil }) return b.client, nil @@ -126,63 +114,63 @@ func (p *reverseBuildletPool) nukeBuildlet(victim *buildlet.Client) { } } -// reverseHealthCheck requests the status page of each idle buildlet. +// healthCheckBuildletLoop periodically requests the status from b. // If the buildlet fails to respond promptly, it is removed from the pool. -func (p *reverseBuildletPool) reverseHealthCheck() { - p.mu.Lock() - responses := make(map[*reverseBuildlet]chan error) - for _, b := range p.buildlets { - if b.inUseAs == "health" { // sanity check - panic("previous health check still running") - } - if b.inUseAs != "" { - continue // skip busy buildlets +func (p *reverseBuildletPool) healthCheckBuildletLoop(b *reverseBuildlet) { + for { + time.Sleep(time.Duration(10+rand.Intn(5)) * time.Second) + if !p.healthCheckBuildlet(b) { + return } - b.inUseAs = "health" - b.inUseTime = time.Now() - res := make(chan error, 1) - responses[b] = res - client := b.client - go func() { - _, err := client.Status() - res <- err - }() } - p.mu.Unlock() - time.Sleep(5 * time.Second) // give buildlets time to respond +} + +func (p *reverseBuildletPool) healthCheckBuildlet(b *reverseBuildlet) bool { p.mu.Lock() + if b.inUseAs == "health" { // sanity check + panic("previous health check still running") + } + if b.inUseAs != "" { + p.mu.Unlock() + return true // skip busy buildlets + } + b.inUseAs = "health" + b.inUseTime = time.Now() + res := make(chan error, 1) + go func() { + _, err := b.client.Status() + res <- err + }() + p.mu.Unlock() - var buildlets []*reverseBuildlet - for _, b := range p.buildlets { - res := responses[b] - if b.inUseAs != "health" || res == nil { - // buildlet skipped or registered after health check - buildlets = append(buildlets, b) - continue - } - b.inUseAs = "" - b.inUseTime = time.Now() - p.noteBuildletReturned() - var err error - select { - case err = <-res: - default: - // It had 5 seconds above to send to the - // buffered channel. So if we're here, it took - // over 5 seconds. - err = errors.New("health check timeout") - } - if err == nil { - buildlets = append(buildlets, b) - continue - } + t := time.NewTimer(5 * time.Second) // give buildlets time to respond + var err error + select { + case err = <-res: + t.Stop() + case <-t.C: + err = errors.New("health check timeout") + } + + if err != nil { // remove bad buildlet - log.Printf("Reverse buildlet %s %v not responding, removing from pool", b.client, b.modes) + log.Printf("Health check fail; removing reverse buildlet %s %v: %v", b.client, b.modes, err) go b.client.Close() - go b.conn.Close() + go p.nukeBuildlet(b.client) + return false } - p.buildlets = buildlets - p.mu.Unlock() + + p.mu.Lock() + defer p.mu.Unlock() + + if b.inUseAs != "health" { + // buildlet was grabbed while lock was released; harmless. + return true + } + b.inUseAs = "" + b.inUseTime = time.Now() + p.noteBuildletReturned() + return true } var ( @@ -242,13 +230,13 @@ func (p *reverseBuildletPool) GetBuildlet(cancel Cancel, machineType, rev string } func (p *reverseBuildletPool) cleanedBuildlet(b *buildlet.Client, el eventTimeLogger) (*buildlet.Client, error) { - el.logEventTime("got_machine") + el.logEventTime("got_machine", b.String()) // Clean up any files from previous builds. if err := b.RemoveAll("."); err != nil { b.Close() return nil, err } - el.logEventTime("cleaned_up") + el.logEventTime("cleaned_up", b.Name()) return b, nil } @@ -266,13 +254,21 @@ func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) { var machineBuf bytes.Buffer p.mu.Lock() - for _, b := range p.buildlets { + buildlets := append([]*reverseBuildlet(nil), p.buildlets...) + sort.Sort(byModeThenHostname(buildlets)) + for _, b := range buildlets { machStatus := "idle" if b.inUseAs != "" { machStatus = "working as " + b.inUseAs + "" } - fmt.Fprintf(&machineBuf, "
  • %s, %s: %s for %v
  • \n", - b.conn.RemoteAddr(), strings.Join(b.modes, ", "), machStatus, time.Since(b.inUseTime)) + fmt.Fprintf(&machineBuf, "
  • %s (%s) version %s, %s: connected %v, %s for %v
  • \n", + b.hostname, + b.conn.RemoteAddr(), + b.version, + strings.Join(b.modes, ", "), + time.Since(b.regTime), + machStatus, + time.Since(b.inUseTime)) for _, mode := range b.modes { if b.inUseAs != "" && b.inUseAs != "health" { if mode == b.inUseAs { @@ -357,11 +353,28 @@ func (p *reverseBuildletPool) CanBuild(mode string) bool { return false } +func (p *reverseBuildletPool) addBuildlet(b *reverseBuildlet) { + p.mu.Lock() + defer p.mu.Unlock() + p.buildlets = append(p.buildlets, b) + go p.healthCheckBuildletLoop(b) +} + // reverseBuildlet is a registered reverse buildlet. // Its immediate fields are guarded by the reverseBuildletPool mutex. type reverseBuildlet struct { - client *buildlet.Client - conn net.Conn + // hostname is the name of the buildlet host. + // It doesn't have to be a complete DNS name. + hostname string + // version is the reverse buildlet's version. + version string + + // sessRand is the unique random number for every unique buildlet session. + sessRand string + + client *buildlet.Client + conn net.Conn + regTime time.Time // when it was first connected // modes is the set of valid modes for this buildlet. // @@ -380,6 +393,13 @@ type reverseBuildlet struct { inUseTime time.Time } +func (b *reverseBuildlet) firstMode() string { + if len(b.modes) == 0 { + return "" + } + return b.modes[0] +} + func handleReverse(w http.ResponseWriter, r *http.Request) { if r.TLS == nil { http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError) @@ -404,20 +424,28 @@ func handleReverse(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - log.Printf("Registering reverse buildlet %s for modes %v", r.RemoteAddr, modes) + hostname := r.Header.Get("X-Go-Builder-Hostname") + + revDialer := revdial.NewDialer(bufrw, conn) - // The server becomes a (very simple) http client. - (&http.Response{StatusCode: 200, Proto: "HTTP/1.1"}).Write(conn) + log.Printf("Registering reverse buildlet %q (%s) for modes %v", hostname, r.RemoteAddr, modes) - client := buildlet.NewClient("none", buildlet.NoKeyPair) + (&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn) + + client := buildlet.NewClient(hostname, buildlet.NoKeyPair) client.SetHTTPClient(&http.Client{ - Transport: newRoundTripper(client, conn, bufrw), + Transport: &http.Transport{ + Dial: func(network, addr string) (net.Conn, error) { + return revDialer.Dial() + }, + }, }) - client.SetDescription(fmt.Sprintf("reverse peer %s for modes %v", r.RemoteAddr, modes)) + client.SetDescription(fmt.Sprintf("reverse peer %s/%s for modes %v", hostname, r.RemoteAddr, modes)) tstatus := time.Now() status, err := client.Status() if err != nil { - log.Printf("Reverse connection %s for modes %v did not answer status after %v: %v", r.RemoteAddr, modes, time.Since(tstatus), err) + log.Printf("Reverse connection %s/%s for modes %v did not answer status after %v: %v", + hostname, r.RemoteAddr, modes, time.Since(tstatus), err) conn.Close() return } @@ -426,91 +454,32 @@ func handleReverse(w http.ResponseWriter, r *http.Request) { conn.Close() return } - log.Printf("Buildlet %s: %+v for %s", r.RemoteAddr, status, modes) + log.Printf("Buildlet %s/%s: %+v for %s", hostname, r.RemoteAddr, status, modes) - // TODO(crawshaw): unregister buildlet when it disconnects. Maybe just - // periodically request Status, and if there's no response unregister. - reversePool.mu.Lock() - defer reversePool.mu.Unlock() + now := time.Now() b := &reverseBuildlet{ + hostname: hostname, + version: r.Header.Get("X-Go-Builder-Version"), modes: modes, client: client, conn: conn, - inUseTime: time.Now(), + inUseTime: now, + regTime: now, } - reversePool.buildlets = append(reversePool.buildlets, b) - registerBuildlet(modes) + reversePool.addBuildlet(b) + registerBuildlet(modes) // testing only } var registerBuildlet = func(modes []string) {} // test hook -func newRoundTripper(bc *buildlet.Client, conn net.Conn, bufrw *bufio.ReadWriter) *reverseRoundTripper { - return &reverseRoundTripper{ - bc: bc, - conn: conn, - bufrw: bufrw, - sema: make(chan bool, 1), - } -} +type byModeThenHostname []*reverseBuildlet -// reverseRoundTripper is an http client that serializes all requests -// over a *bufio.ReadWriter. -// -// Attempts at concurrent requests return an error. -type reverseRoundTripper struct { - bc *buildlet.Client - conn net.Conn - bufrw *bufio.ReadWriter - sema chan bool -} - -func (c *reverseRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { - // Serialize trips. It is up to callers to avoid deadlocking. - c.sema <- true - if err := req.Write(c.bufrw); err != nil { - go c.conn.Close() - <-c.sema - return nil, err - } - if err := c.bufrw.Flush(); err != nil { - go c.conn.Close() - <-c.sema - return nil, err +func (s byModeThenHostname) Len() int { return len(s) } +func (s byModeThenHostname) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s byModeThenHostname) Less(i, j int) bool { + bi, bj := s[i], s[j] + if bi.firstMode() < bj.firstMode() { + return true } - resp, err = http.ReadResponse(c.bufrw.Reader, req) - if err != nil { - go c.conn.Close() - <-c.sema - return nil, err - } - resp.Body = &reverseLockedBody{c, resp.Body, c.sema} - return resp, err -} - -type reverseLockedBody struct { - rt *reverseRoundTripper - body io.ReadCloser - sema chan bool -} - -func (b *reverseLockedBody) Read(p []byte) (n int, err error) { - n, err = b.body.Read(p) - if err != nil && err != io.EOF { - go b.rt.conn.Close() - } - return -} - -func (b *reverseLockedBody) Close() error { - // Set a timer to hard-nuke the connection in case b.body.Close hangs, - // as seen in Issue 11869. - t := time.AfterFunc(5*time.Second, func() { - reversePool.nukeBuildlet(b.rt.bc) - go b.rt.conn.Close() // redundant if nukeBuildlet did it, but harmless. - }) - err := b.body.Close() - t.Stop() - <-b.sema - b.body = nil // prevent double close - return err + return bi.hostname < bj.hostname } diff --git a/cmd/coordinator/watcher.go b/cmd/coordinator/watcher.go index c4dd2d0689..e6835758c6 100644 --- a/cmd/coordinator/watcher.go +++ b/cmd/coordinator/watcher.go @@ -16,6 +16,7 @@ import ( "net/url" "os" "os/exec" + "regexp" "strings" "sync" "time" @@ -43,8 +44,16 @@ type imageInfo struct { lastMod string } +// watcherDockerImage is the Docker container we run in. This +// "go-watcher-world" container doesn't actually contain the watcher +// binary itself; instead, the watcher binary is this coordinator +// binary, which we bind mount into the world with "docker run -v". +// That we we only need to update the Docker environment when there +// are things we need (git, etc). +const watcherDockerImage = "go-watcher-world" + var images = map[string]*imageInfo{ - "go-commit-watcher": {url: "https://storage.googleapis.com/go-builder-data/docker-commit-watcher.tar.gz"}, + watcherDockerImage: {url: "https://storage.googleapis.com/go-builder-data/docker-watcher-world.tar.gz"}, } const gitArchiveAddr = "127.0.0.1:21536" // 21536 == keys above WATCH @@ -54,6 +63,10 @@ func startWatchers() { if inStaging { mirrorBase = "" // don't mirror from dev cluster } + const bradIsGrumpy = true + if bradIsGrumpy { + mirrorBase = "" // one fewer thing to crash; TODO(adg): fix. + } addWatcher(watchConfig{ repo: "https://go.googlesource.com/go", dash: dashBase(), @@ -61,7 +74,11 @@ func startWatchers() { netHost: true, httpAddr: gitArchiveAddr, }) - addWatcher(watchConfig{repo: "https://go.googlesource.com/gofrontend", dash: dashBase() + "gccgo/"}) + if false { + // TODO(cmang,adg): only use one watcher or the other, depending on which build + // coordinator is in use. + addWatcher(watchConfig{repo: "https://go.googlesource.com/gofrontend", dash: dashBase() + "gccgo/"}) + } go cleanUpOldContainers() @@ -103,17 +120,19 @@ func (conf watchConfig) dockerRunArgs() (args []string) { // TODO(adg): fix images that look in the wrong place. args = append(args, "-v", tmpKey+":/.gobuildkey") args = append(args, "-v", tmpKey+":/root/.gobuildkey") + args = append(args, "-v", os.Args[0]+":/usr/local/bin/watcher") } if conf.netHost { args = append(args, "--net=host") } args = append(args, - "go-commit-watcher", + watcherDockerImage, "/usr/local/bin/watcher", - "-repo="+conf.repo, - "-dash="+conf.dash, - "-poll="+conf.interval.String(), - "-http="+conf.httpAddr, + "-role=watcher", + "-watcher.repo="+conf.repo, + "-watcher.dash="+conf.dash, + "-watcher.poll="+conf.interval.String(), + "-watcher.http="+conf.httpAddr, ) if conf.mirrorBase != "" { dst, err := url.Parse(conf.mirrorBase) @@ -183,6 +202,27 @@ func condUpdateImage(img string) error { return nil } +var ( + watchLogMu sync.Mutex + watchLastFail = map[string]string{} // repo -> logs + watchContainer = map[string]string{} // repo -> container +) + +var matchTokens = regexp.MustCompile(`\b[0-9a-f]{40}\b`) + +func handleDebugWatcher(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + watchLogMu.Lock() + defer watchLogMu.Unlock() + for repo, logs := range watchLastFail { + fmt.Fprintf(w, "============== Watcher %s, last fail:\n%s\n\n", repo, matchTokens.ReplaceAllString(logs, "---40hexomitted---")) + } + for repo, container := range watchContainer { + logs, _ := exec.Command("docker", "logs", container).CombinedOutput() + fmt.Fprintf(w, "============== Watcher %s, current container logs:\n%s\n\n", repo, matchTokens.ReplaceAll(logs, []byte("---40hexomitted---"))) + } +} + func startWatching(conf watchConfig) (err error) { defer func() { if err != nil { @@ -190,7 +230,7 @@ func startWatching(conf watchConfig) (err error) { } }() log.Printf("Starting watcher for %v", conf.repo) - if err := condUpdateImage("go-commit-watcher"); err != nil { + if err := condUpdateImage(watcherDockerImage); err != nil { log.Printf("Failed to setup container for commit watcher: %v", err) return err } @@ -201,16 +241,29 @@ func startWatching(conf watchConfig) (err error) { log.Printf("Docker run for commit watcher = err:%v, output: %s", err, all) return err } + container := strings.TrimSpace(string(all)) + + watchLogMu.Lock() + watchContainer[conf.repo] = container + watchLogMu.Unlock() + // Start a goroutine to wait for the watcher to die. go func() { exec.Command("docker", "wait", container).Run() out, _ := exec.Command("docker", "logs", container).CombinedOutput() exec.Command("docker", "rm", "-v", container).Run() - const maxLogBytes = 1 << 10 + const maxLogBytes = 512 << 10 if len(out) > maxLogBytes { - out = out[len(out)-maxLogBytes:] + var partial bytes.Buffer + partial.Write(out[:maxLogBytes/2]) + partial.WriteString("\n...(omitted)...\n") + partial.Write(out[len(out)-(maxLogBytes/2):]) + out = partial.Bytes() } + watchLogMu.Lock() + watchLastFail[conf.repo] = string(out) + watchLogMu.Unlock() log.Printf("Watcher %v crashed. Restarting soon. Logs: %s", conf.repo, out) restartWatcherSoon(conf) }() diff --git a/cmd/watcher/watcher.go b/cmd/coordinator/watcher_process.go similarity index 93% rename from cmd/watcher/watcher.go rename to cmd/coordinator/watcher_process.go index 183d3cc5ef..3fcfebffec 100644 --- a/cmd/watcher/watcher.go +++ b/cmd/coordinator/watcher_process.go @@ -2,9 +2,12 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Command watcher watches the specified repository for new commits -// and reports them to the build dashboard. -package main // import "golang.org/x/build/cmd/watcher" +// The watcher binary watches the specified repositories for new +// commits and reports them to the build dashboard. This binary is +// compiled in to the coordinator binary and runs in a Docker +// container (see env/watcher-world) via the coordinator. + +package main import ( "bufio" @@ -39,15 +42,15 @@ const ( ) var ( - repoURL = flag.String("repo", goBase+"go", "Repository URL") - dashboard = flag.String("dash", "https://build.golang.org/", "Dashboard URL (must end in /)") - keyFile = flag.String("key", defaultKeyFile, "Build dashboard key file") - pollInterval = flag.Duration("poll", 10*time.Second, "Remote repo poll interval") - network = flag.Bool("network", true, "Enable network calls (disable for testing)") - mirrorBase = flag.String("mirror", "", `Mirror repository base URL (eg "https://github.com/golang/")`) - filter = flag.String("filter", "", "Comma-separated list of directories or files to watch for new commits (only works on main repo)") - httpAddr = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on") - report = flag.Bool("report", true, "Report updates to build dashboard (use false for development dry-run mode)") + repoURL = flag.String("watcher.repo", goBase+"go", "Repository URL") + dashFlag = flag.String("watcher.dash", "https://build.golang.org/", "Dashboard URL (must end in /)") + keyFile = flag.String("watcher.key", defaultKeyFile, "Build dashboard key file") + pollInterval = flag.Duration("watcher.poll", 10*time.Second, "Remote repo poll interval") + network = flag.Bool("watcher.network", true, "Enable network calls (disable for testing)") + mirrorBase = flag.String("watcher.mirror", "", `Mirror repository base URL (eg "https://github.com/golang/")`) + filter = flag.String("watcher.filter", "", "Comma-separated list of directories or files to watch for new commits (only works on main repo)") + httpAddr = flag.String("watcher.http", "", "If non-empty, the listen address to run an HTTP server on") + report = flag.Bool("watcher.report", true, "Report updates to build dashboard (use false for development dry-run mode)") ) var ( @@ -56,19 +59,18 @@ var ( networkSeen = make(map[string]bool) // track known hashes for testing ) -func main() { - flag.Parse() +func watcherMain() { + log.Printf("Running watcher role.") go pollGerritAndTickle() - - err := run() - fmt.Fprintln(os.Stderr, err) + err := runWatcher() + log.Printf("Watcher exiting after failure: %v", err) os.Exit(1) } -// run is a little wrapper so we can use defer and return to signal +// runWatcher is a little wrapper so we can use defer and return to signal // errors. It should only return a non-nil error. -func run() error { - if !strings.HasSuffix(*dashboard, "/") { +func runWatcher() error { + if !strings.HasSuffix(*dashFlag, "/") { return errors.New("dashboard URL (-dashboard) must end in /") } @@ -421,7 +423,7 @@ func (r *Repo) postCommit(c *Commit) error { } v := url.Values{"version": {fmt.Sprint(watcherVersion)}, "key": {dashboardKey}} - u := *dashboard + "commit?" + v.Encode() + u := *dashFlag + "commit?" + v.Encode() resp, err := http.Post(u, "text/json", bytes.NewReader(b)) if err != nil { return err @@ -574,7 +576,7 @@ func (r *Repo) dashSeen(hash string) (bool, error) { return networkSeen[hash], nil } v := url.Values{"hash": {hash}, "packagePath": {r.path}} - u := *dashboard + "commit?" + v.Encode() + u := *dashFlag + "commit?" + v.Encode() resp, err := http.Get(u) if err != nil { return false, err @@ -847,7 +849,7 @@ func subrepoList() ([]string, error) { return nil, nil } - r, err := http.Get(*dashboard + "packages?kind=subrepo") + r, err := http.Get(*dashFlag + "packages?kind=subrepo") if err != nil { return nil, fmt.Errorf("subrepo list: %v", err) } diff --git a/cmd/scaleway/scaleway.go b/cmd/scaleway/scaleway.go index fe977286ae..2b9d84a5e7 100644 --- a/cmd/scaleway/scaleway.go +++ b/cmd/scaleway/scaleway.go @@ -7,6 +7,8 @@ package main import ( "bytes" + "crypto/hmac" + "crypto/md5" "encoding/json" "flag" "fmt" @@ -19,14 +21,30 @@ import ( ) var ( - token = flag.String("token", "", "API token") - org = flag.String("org", "1f34701d-668b-441b-bf08-0b13544e99de", "Organization ID (default is bradfitz@golang.org's account)") - image = flag.String("image", "b9fcca88-fa85-4606-a2b2-3c8a7ff94fbd", "Disk image ID; default is the snapshot we made last") - num = flag.Int("n", 20, "Number of servers to create") + token = flag.String("token", "", "API token") + org = flag.String("org", "1f34701d-668b-441b-bf08-0b13544e99de", "Organization ID (default is bradfitz@golang.org's account)") + image = flag.String("image", "bebe2c6f-bbb5-4182-9cce-04cab2f44b2b", "Disk image ID; default is the snapshot we made last") + num = flag.Int("n", 0, "Number of servers to create; if zero, defaults to a value as a function of --staging") + tags = flag.String("tags", "", "Comma-separated list of tags. The build key tags should be of the form 'buildkey_linux-arm_HEXHEXHEXHEXHEX'. If empty, it's automatic.") + staging = flag.Bool("staging", false, "If true, deploy staging instances (with staging names and tags) instead of prod.") ) func main() { flag.Parse() + if *tags == "" { + if *staging { + *tags = defaultBuilderTags("gobuilder-staging.key") + } else { + *tags = defaultBuilderTags("gobuilder-master.key") + } + } + if *num == 0 { + if *staging { + *num = 5 + } else { + *num = 20 + } + } if *token == "" { file := filepath.Join(os.Getenv("HOME"), "keys/go-scaleway.token") slurp, err := ioutil.ReadFile(file) @@ -47,13 +65,21 @@ func main() { } for i := 1; i <= *num; i++ { - name := fmt.Sprintf("go-build-%d", i) + name := fmt.Sprintf("scaleway-prod-%02d", i) + if *staging { + name = fmt.Sprintf("scaleway-staging-%02d", i) + } _, ok := servers[name] if !ok { + tags := strings.Split(*tags, ",") + if *staging { + tags = append(tags, "staging") + } body, err := json.Marshal(createServerRequest{ Org: *org, Name: name, Image: *image, + Tags: tags, }) if err != nil { log.Fatal(err) @@ -79,6 +105,9 @@ func main() { log.Fatal(err) } for _, s := range serverList { + if strings.HasSuffix(s.Name, "-prep") || strings.HasSuffix(s.Name, "-hand") { + continue + } if s.State == "stopped" { log.Printf("Powering on %s = %v", s.ID, cl.PowerOn(s.ID)) } @@ -86,9 +115,10 @@ func main() { } type createServerRequest struct { - Org string `json:"organization"` - Name string `json:"name"` - Image string `json:"image"` + Org string `json:"organization"` + Name string `json:"name"` + Image string `json:"image"` + Tags []string `json:"tags"` } type Client struct { @@ -151,3 +181,20 @@ type IP struct { ID string `json:"id"` Address string `json:"address"` } + +// defaultBuilderTags returns the default value of the "tags" flag. +// It returns a comma-separated list of builder tags (each of the form buildkey_$(BUILDER)_$(SECRETHEX)). +func defaultBuilderTags(baseKeyFile string) string { + keyFile := filepath.Join(os.Getenv("HOME"), "keys", baseKeyFile) + slurp, err := ioutil.ReadFile(keyFile) + if err != nil { + log.Fatal(err) + } + var tags []string + for _, builder := range []string{"linux-arm", "linux-arm-arm5"} { + h := hmac.New(md5.New, bytes.TrimSpace(slurp)) + h.Write([]byte(builder)) + tags = append(tags, fmt.Sprintf("buildkey_%s_%x", builder, h.Sum(nil))) + } + return strings.Join(tags, ",") +} diff --git a/dashboard/builders.go b/dashboard/builders.go index ab256c030d..82281aa34b 100644 --- a/dashboard/builders.go +++ b/dashboard/builders.go @@ -151,14 +151,6 @@ func (c *BuildConfig) AllScript() string { // but for now we've only set up the scripts and verified that the main // configurations work. func (c *BuildConfig) SplitMakeRun() bool { - if strings.HasPrefix(c.Name, "linux-arm") { - // On Scaleway, we don't want to snapshot these to GCS - // yet. That might be a lot of bandwidth and we - // haven't measure their speed yet. We might want to - // store snapshots within Scaleway instead. For now: - // use the old way. - return false - } switch c.AllScript() { case "src/all.bash", "src/race.bash", "src/all.bat", "src/all.rc": // These we've verified to work. @@ -336,7 +328,6 @@ func init() { env: []string{"GOROOT_BOOTSTRAP=/go1.4"}, allScriptArgs: []string{ // Filtering pattern to buildall.bash: - // TODO: add darwin-386 and "^(linux-arm64|linux-ppc64|linux-ppc64le|nacl-arm|plan9-amd64|solaris-amd64|netbsd-386|netbsd-amd64|netbsd-arm|freebsd-arm|darwin-386)$", }, }) @@ -393,77 +384,11 @@ func init() { env: []string{"GOROOT_BOOTSTRAP=/go1.4"}, }) addBuilder(BuildConfig{ - Name: "linux-arm-qemu", - VMImage: "linux-buildlet-arm", - env: []string{"GOROOT_BOOTSTRAP=/go1.4", "IN_QEMU=1"}, - }) - addBuilder(BuildConfig{ - Name: "linux-arm", - IsReverse: true, - env: []string{"GOROOT_BOOTSTRAP=/usr/local/go"}, - }) - // Sharded ARM trybots: - addBuilder(BuildConfig{ - Name: "linux-arm-shard_test", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=^test$", - }, - }) - addBuilder(BuildConfig{ - Name: "linux-arm-shard_std_am", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=^go_test:[a-m]", - }, - }) - addBuilder(BuildConfig{ - Name: "linux-arm-shard_std_nz", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=^go_test:[n-z]", - }, - }) - addBuilder(BuildConfig{ - Name: "linux-arm-shard_runtimecpu", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=^runtime:cpu124$", - }, - }) - addBuilder(BuildConfig{ - Name: "linux-arm-shard_cgotest", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=^cgo_test$", - }, - }) - addBuilder(BuildConfig{ - Name: "linux-arm-shard_misc", - BuildletType: "linux-arm", - TryOnly: true, - IsReverse: true, - env: []string{ - "GOROOT_BOOTSTRAP=/usr/local/go", - "GOTESTONLY=!^(go_test:|test$|cgo_test$|runtime:cpu124$)", - }, + Name: "linux-arm", + IsReverse: true, + NumTestHelpers: 6, + env: []string{"GOROOT_BOOTSTRAP=/usr/local/go"}, }) - addBuilder(BuildConfig{ Name: "linux-arm-arm5", IsReverse: true, diff --git a/env/commit-watcher/Dockerfile b/env/commit-watcher/Dockerfile deleted file mode 100644 index a652056b0a..0000000000 --- a/env/commit-watcher/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2014 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -# Commit watcher for Go repos. - -FROM debian:wheezy -MAINTAINER golang-dev - -ENV DEBIAN_FRONTEND noninteractive - -ADD /scripts/install-apt-deps.sh /scripts/ -RUN /scripts/install-apt-deps.sh - -ADD /scripts/build-commit-watcher.sh /scripts/ -# Note that WATCHER_REV must be full for "git fetch origin $REV" later: -RUN GO_REV=go1.5 TOOLS_REV=1330b28 WATCHER_REV=629dcf77 /scripts/build-commit-watcher.sh && test -f /usr/local/bin/watcher diff --git a/env/commit-watcher/Makefile b/env/commit-watcher/Makefile deleted file mode 100644 index 7545b1b3d3..0000000000 --- a/env/commit-watcher/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -# Copyright 2014 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -docker: Dockerfile - docker build -t go-commit-watcher . - -upload: docker - docker save go-commit-watcher | gzip | (cd ../../cmd/upload && go run upload.go --public go-builder-data/docker-commit-watcher.tar.gz) -dev-upload: - docker save go-commit-watcher | gzip | (cd ../../cmd/upload && go run upload.go --public dev-go-builder-data/docker-commit-watcher.tar.gz) diff --git a/env/commit-watcher/scripts/build-commit-watcher.sh b/env/commit-watcher/scripts/build-commit-watcher.sh deleted file mode 100755 index 0b97037c2f..0000000000 --- a/env/commit-watcher/scripts/build-commit-watcher.sh +++ /dev/null @@ -1,34 +0,0 @@ -set -ex - -export GOPATH=/gopath -export GOROOT=/goroot -PREFIX=/usr/local -: ${GO_REV:?"need to be set to the golang repo revision used to build the commit watcher."} -: ${TOOLS_REV:?"need to be set to the tools repo revision used to build the commit watcher."} -: ${WATCHER_REV:?"need to be set to the build repo revision for the commit watcher."} - -curl https://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz | tar -xz -C $HOME -mv $HOME/go $HOME/go1.4 - -mkdir -p $GOROOT -git clone https://go.googlesource.com/go $GOROOT -(cd $GOROOT/src && git reset --hard $GO_REV && find && ./make.bash) - -GO_TOOLS=$GOPATH/src/golang.org/x/tools -mkdir -p $GO_TOOLS -git clone https://go.googlesource.com/tools $GO_TOOLS -(cd $GO_TOOLS && git reset --hard $TOOLS_REV) - -GO_BUILD=$GOPATH/src/golang.org/x/build -mkdir -p $GO_BUILD -git clone https://go.googlesource.com/build $GO_BUILD - -# Um, this didn't seem to work? Old git version in wheezy? -#git fetch https://go.googlesource.com/build $WATCHER_REV:origin/dummy-commit # in case it's a pending CL -# Hack, instead: -cd $GO_BUILD && git fetch https://go.googlesource.com/build refs/changes/50/10750/5 - -mkdir -p $PREFIX/bin -(cd $GO_BUILD && git reset --hard $WATCHER_REV && GOBIN=$PREFIX/bin /goroot/bin/go install golang.org/x/build/cmd/watcher) - -rm -fR $GOROOT/bin $GOROOT/pkg $GOPATH diff --git a/env/linux-arm/scaleway/Dockerfile b/env/linux-arm/scaleway/Dockerfile deleted file mode 100644 index 325b304cbb..0000000000 --- a/env/linux-arm/scaleway/Dockerfile +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2015 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -FROM armbuild/ubuntu:trusty - -MAINTAINER golang-dev -ENV DEBIAN_FRONTEND noninteractive - -RUN apt-get update -RUN apt-get install -y --no-install-recommends curl - -RUN echo "607573c55dc89d135c3c9c84bba6ba6095a37a1e go.tar.gz" > /tmp/go.tar.gz.sha1 -RUN cd /tmp && \ - curl --silent -o go.tar.gz http://dave.cheney.net/paste/go1.4.2.linux-arm~multiarch-armv7-1.tar.gz && \ - sha1sum -c go.tar.gz.sha1 && \ - tar -C /usr/local -zxvf go.tar.gz && \ - rm -rf go.tar.gz - -RUN apt-get install -y --no-install-recommends ca-certificates - -RUN mkdir /usr/local/gomaster -ENV GO_MASTER_VERSION d4bb72b4 -RUN curl https://go.googlesource.com/go/+archive/$GO_MASTER_VERSION.tar.gz | tar -C /usr/local/gomaster -zxv -ENV GOROOT /usr/local/gomaster -RUN echo "devel $GO_MASTER_VERSION" > $GOROOT/VERSION - -RUN apt-get install -y --no-install-recommends gcc -RUN apt-get install -y --no-install-recommends libc6-dev - -ENV GOROOT_BOOTSTRAP /usr/local/go -RUN cd $GOROOT/src && ./make.bash - -RUN apt-get install -y --no-install-recommends git-core - -ENV GOPATH /gopath -RUN mkdir /gopath -RUN $GOROOT/bin/go get golang.org/x/build/cmd/buildlet - -ADD run-buildlet.sh /usr/local/bin/run-buildlet.sh - -# Environment variables to be passed to "docker run" -# for linux-arm and linux-arm-arm5: -ENV BUILDKEY_ARM="" BUILDKEY_ARM5="" - -ENTRYPOINT ["/usr/local/bin/run-buildlet.sh"] diff --git a/env/linux-arm/scaleway/buildlet.service b/env/linux-arm/scaleway/buildlet.service new file mode 100644 index 0000000000..0fee9b1ab5 --- /dev/null +++ b/env/linux-arm/scaleway/buildlet.service @@ -0,0 +1,16 @@ +# See NOTES files + +[Unit] +Description=Go builder buildlet +After=network.target + +[Install] +WantedBy=network-online.target + +[Service] +Type=simple +ExecStartPre=/bin/sh -c '/usr/bin/curl -f -o /usr/local/bin/buildlet-stage0 https://storage.googleapis.com/go-builder-data/buildlet-stage0.linux-arm-scaleway?$(date +%s) && chmod +x /usr/local/bin/buildlet-stage0' +ExecStart=/usr/local/bin/buildlet-stage0 +Restart=always +RestartSec=2 +StartLimitInterval=0 diff --git a/env/linux-arm/scaleway/run-buildlet.sh b/env/linux-arm/scaleway/run-buildlet.sh deleted file mode 100644 index 22e8b958b7..0000000000 --- a/env/linux-arm/scaleway/run-buildlet.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash - -# Meant to be run under Docker. - -if [ "$BUILDKEY_ARM" == "" ]; then -env - echo "ERROR: BUILDKEY_ARM not set. (using docker run?)" >&2 - exit 1 -fi -if [ "$BUILDKEY_ARM5" == "" ]; then -env - echo "ERROR: BUILDKEY_ARM5 not set. (using docker run?)" >&2 - exit 1 -fi - - -set -e -echo $BUILDKEY_ARM > /root/.gobuildkey-linux-arm -echo $BUILDKEY_ARM5 > /root/.gobuildkey-linux-arm-arm5 -exec /gopath/bin/buildlet -reverse=linux-arm,linux-arm-arm5 -coordinator farmer.golang.org:443 diff --git a/env/watcher-world/Dockerfile b/env/watcher-world/Dockerfile new file mode 100644 index 0000000000..896f6272aa --- /dev/null +++ b/env/watcher-world/Dockerfile @@ -0,0 +1,16 @@ +# Copyright 2015 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +# Commit watcher's environment for Go repos, but without the commit +# watcher itself. That is now folded into the coordinator binary +# itself. This Dockerfile is just the environment in which that binary +# then runs. + +FROM debian:wheezy +MAINTAINER golang-dev + +ENV DEBIAN_FRONTEND noninteractive + +ADD /scripts/install-apt-deps.sh /scripts/ +RUN /scripts/install-apt-deps.sh diff --git a/env/watcher-world/Makefile b/env/watcher-world/Makefile new file mode 100644 index 0000000000..9a664a611f --- /dev/null +++ b/env/watcher-world/Makefile @@ -0,0 +1,11 @@ +# Copyright 2014 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +docker: Dockerfile + docker build -t go-watcher-world . + +upload: docker + docker save go-watcher-world | gzip | (cd ../../cmd/upload && go run upload.go --public go-builder-data/docker-watcher-world.tar.gz) +dev-upload: + docker save go-watcher-world | gzip | (cd ../../cmd/upload && go run upload.go --public dev-go-builder-data/docker-watcher-world.tar.gz) diff --git a/env/commit-watcher/scripts/install-apt-deps.sh b/env/watcher-world/scripts/install-apt-deps.sh similarity index 57% rename from env/commit-watcher/scripts/install-apt-deps.sh rename to env/watcher-world/scripts/install-apt-deps.sh index 79cc2cd53e..9f205a7f0f 100755 --- a/env/commit-watcher/scripts/install-apt-deps.sh +++ b/env/watcher-world/scripts/install-apt-deps.sh @@ -2,12 +2,8 @@ set -ex apt-get update apt-get install -y --no-install-recommends ca-certificates -# For building Go's bootstrap 'dist' prog -apt-get install -y --no-install-recommends gcc libc6-dev # For interacting with the Go source & subrepos: apt-get install -y --no-install-recommends git-core -# For fetching go1.4 -apt-get install -y --no-install-recommends curl apt-get clean rm -fr /var/lib/apt/lists diff --git a/pargzip/pargzip.go b/pargzip/pargzip.go new file mode 100644 index 0000000000..ff5336904a --- /dev/null +++ b/pargzip/pargzip.go @@ -0,0 +1,209 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pargzip contains a parallel gzip writer implementation. By +// compressing each chunk of data in parallel, all the CPUs on the +// machine can be used, at a slight loss of compression efficiency. +// In addition, this implementation can use the system gzip binary as +// a child process, which is faster than Go's native implementation. +package pargzip + +import ( + "bufio" + "bytes" + "compress/gzip" + "io" + "os/exec" + "runtime" + "strings" + "sync" +) + +// A Writer is an io.WriteCloser. +// Writes to a Writer are compressed and written to w. +// +// Any exported fields may only be mutated before the first call to +// Write. +type Writer struct { + // UseSystemGzip controls whether the system gzip binary is + // used. The default from NewWriter is true. + UseSystemGzip bool + + // ChunkSize is the number of bytes to gzip at once. + // The default from NewWriter is 1MB. + ChunkSize int + + // Parallel is the number of chunks to compress in parallel. + // The default from NewWriter is runtime.NumCPU(). + Parallel int + + w io.Writer + bw *bufio.Writer + + allWritten chan struct{} // when writing goroutine ends + + sem chan bool // semaphore bounding compressions in flight + chunkc chan *writeChunk // closed on Close + + mu sync.Mutex // guards following + closed bool + err error // sticky write error +} + +type writeChunk struct { + zw *Writer + p string // uncompressed + + donec chan struct{} // closed on completion + + // one of following is set: + z []byte // compressed + err error // exec error +} + +// compress runs the gzip child process. +// It runs in its own goroutine. +func (c *writeChunk) compress() (err error) { + defer func() { + if err != nil { + c.err = err + } + close(c.donec) + <-c.zw.sem + }() + var zbuf bytes.Buffer + if c.zw.UseSystemGzip { + cmd := exec.Command("gzip") + cmd.Stdin = strings.NewReader(c.p) + cmd.Stdout = &zbuf + if err := cmd.Run(); err != nil { + return err + } + } else { + zw := gzip.NewWriter(&zbuf) + if _, err := io.Copy(zw, strings.NewReader(c.p)); err != nil { + return err + } + if err := zw.Close(); err != nil { + return err + } + } + c.z = zbuf.Bytes() + return nil +} + +// NewWriter returns a new Writer. +// Writes to the returned writer are compressed and written to w. +// +// It is the caller's responsibility to call Close on the WriteCloser +// when done. Writes may be buffered and not flushed until Close. +// +// Any fields on Writer may only be modified before the first call to +// Write. +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + allWritten: make(chan struct{}), + + UseSystemGzip: true, + ChunkSize: 1 << 20, + Parallel: runtime.NumCPU(), + } +} + +func (w *Writer) didInit() bool { return w.bw != nil } + +func (w *Writer) init() { + w.bw = bufio.NewWriterSize(newChunkWriter{w}, w.ChunkSize) + w.chunkc = make(chan *writeChunk, w.Parallel+1) + w.sem = make(chan bool, w.Parallel) + go func() { + defer close(w.allWritten) + for c := range w.chunkc { + if err := w.writeCompressedChunk(c); err != nil { + return + } + } + }() +} + +func (w *Writer) startChunk(p []byte) { + w.sem <- true // block until we can begin + c := &writeChunk{ + zw: w, + p: string(p), // string, since the bufio.Writer owns the slice + donec: make(chan struct{}), + } + go c.compress() // receives from w.sem + w.chunkc <- c +} + +func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) { + defer func() { + if err != nil { + w.mu.Lock() + defer w.mu.Unlock() + if w.err == nil { + w.err = err + } + } + }() + <-c.donec + if c.err != nil { + return c.err + } + _, err = w.w.Write(c.z) + return +} + +func (w *Writer) Write(p []byte) (n int, err error) { + if !w.didInit() { + w.init() + } + return w.bw.Write(p) +} + +func (w *Writer) Close() error { + w.mu.Lock() + err, wasClosed := w.err, w.closed + w.closed = true + w.mu.Unlock() + if wasClosed { + return nil + } + if !w.didInit() { + return nil + } + if err != nil { + return err + } + + w.bw.Flush() + close(w.chunkc) + <-w.allWritten // wait for writing goroutine to end + + w.mu.Lock() + err = w.err + w.mu.Unlock() + return err +} + +// newChunkWriter gets large chunks to compress and write to zw. +type newChunkWriter struct { + zw *Writer +} + +func (cw newChunkWriter) Write(p []byte) (n int, err error) { + n = len(p) + max := cw.zw.ChunkSize + for len(p) > 0 { + chunk := p + if len(chunk) > max { + chunk = chunk[:max] + } + p = p[len(chunk):] + cw.zw.startChunk(chunk) + } + return +} diff --git a/pargzip/pargzip_test.go b/pargzip/pargzip_test.go new file mode 100644 index 0000000000..e6c0ce98ba --- /dev/null +++ b/pargzip/pargzip_test.go @@ -0,0 +1,52 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pargzip + +import ( + "bytes" + "compress/gzip" + "io" + "strings" + "testing" + "time" +) + +func TestWriter(t *testing.T) { + var in bytes.Buffer + big := strings.Repeat("a", 4<<10) + for in.Len() < 10<<20 { + for i := 0; i < 256; i++ { + in.WriteByte(byte(i)) + in.WriteString(big) + } + } + t.Logf("input size = %v", in.Len()) + var zbuf bytes.Buffer + zw := NewWriter(&zbuf) + zw.ChunkSize = 1 << 20 + zw.Parallel = 4 + zw.UseSystemGzip = true + t0 := time.Now() + if n, err := io.Copy(zw, bytes.NewReader(in.Bytes())); err != nil { + t.Fatalf("Copy: %v", err) + } else { + t.Logf("Copied %d bytes", n) + } + if err := zw.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + td := time.Since(t0) + t.Logf("Compressed size: %v (%0.2f%%) in %v", zbuf.Len(), float64(zbuf.Len())/float64(in.Len())*100, td) + + var back bytes.Buffer + zr, _ := gzip.NewReader(bytes.NewReader(zbuf.Bytes())) + if _, err := io.Copy(&back, zr); err != nil { + t.Fatalf("uncompress Copy: %v", err) + } + if !bytes.Equal(in.Bytes(), back.Bytes()) { + t.Error("decompression failed.") + } + t.Logf("correctly read back %d bytes", back.Len()) +} diff --git a/revdial/revdial.go b/revdial/revdial.go new file mode 100644 index 0000000000..5cb75896be --- /dev/null +++ b/revdial/revdial.go @@ -0,0 +1,518 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package revdial implements a dialer and listener which multiplexes +// multiples connections over one. +package revdial + +/* +Protocol: + +7-byte frame header: + +uint8: frame type + 0 new conn (server to peer only) + 1 close conn (either way) + 2 write (either way) +uint32: conn id (coordinator chooses, no ack from peer) +uint16: length of rest of data (for all frame types) + +*/ + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "log" + "net" + "sync" + "time" +) + +type Dialer struct { + rw *bufio.ReadWriter + closer io.Closer + + mu sync.Mutex // guards following, and writes to rw + readErr error // non-nil when peer ends + closed bool + conns map[uint32]*conn + nextID uint32 +} + +// NewDialer returns the side of the connection which will initiate +// new connections. This will typically be the side which did the +// HTTP Hijack. The io.Closer is what gets closed by the Close +// or by any errors. It will typically be the hijacked Conn. +func NewDialer(rw *bufio.ReadWriter, c io.Closer) *Dialer { + d := &Dialer{ + rw: rw, + closer: c, + conns: map[uint32]*conn{}, + nextID: 1, // just for debugging, not seeing zeros + } + go func() { + err := readFrames(rw.Reader, d) + d.mu.Lock() + defer d.mu.Unlock() + if d.closed { + // Be quiet; errors are expected + return + } + defer c.Close() + if err == nil { + err = errors.New("revdial: Dialer.readFrames terminated with success") + } + d.readErr = err + }() + return d +} + +func (d *Dialer) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + if d.closed { + return nil + } + d.closed = true + for _, c := range d.conns { + c.peerClose() + } + return d.closer.Close() +} + +func (d *Dialer) conn(id uint32) (*conn, error) { + d.mu.Lock() + defer d.mu.Unlock() + c, ok := d.conns[id] + if !ok { + return nil, fmt.Errorf("revdial.Dialer saw reference to unknown conn %v", id) + } + return c, nil +} + +var ( + errRole = errors.New("revdial: invalid frame type received for role") + errConnID = errors.New("revdial: invalid connection ID") +) + +func (d *Dialer) onFrame(f frame) error { + switch f.command { + case frameNewConn: + return errRole + case frameCloseConn: + c, err := d.conn(f.connID) + if err != nil { + // Oh well. + return nil + } + c.peerClose() + return nil + case frameWrite: + c, err := d.conn(f.connID) + if err != nil { + // Ignore writes on bogus conn IDs; assume it + // just recently closed. + return nil + } + if _, err := c.peerWrite(f.payload); err != nil { + c.mu.Lock() + closed := c.closed + c.mu.Unlock() + if closed { + // Conn is now closed. Assume error + // was "io: read/write on closed pipe" + // and it was just data in-flight + // while this side closed. So, don't abort + // the frame-reading loop. + return nil + } + return err + } + return nil + default: + // Ignore unknown frame types. + } + return nil +} + +func (d *Dialer) Dial() (net.Conn, error) { + d.mu.Lock() + defer d.mu.Unlock() + if d.closed { + return nil, errors.New("revdial: Dial on closed client") + } + var id uint32 + for { + id = d.nextID + d.nextID++ // wrapping is okay; we check for free ones, assuming sparse + if _, inUse := d.conns[id]; inUse { + continue + } + break + } + c := &conn{ + id: id, + wmu: &d.mu, + w: d.rw.Writer, + unregConn: d.unregConn, + } + c.cond = sync.NewCond(&c.mu) + d.conns[id] = c + err := writeFrame(c, frame{ + command: frameNewConn, + connID: id, + }) + return c, err +} + +// c.wmu must be held. +func writeFrame(c *conn, f frame) error { + if len(f.payload) > 0xffff { + return errors.New("revdial: frame too long") + } + w := c.w + hdr := [7]byte{ + byte(f.command), + byte(f.connID >> 24), + byte(f.connID >> 16), + byte(f.connID >> 8), + byte(f.connID), + byte(len(f.payload) >> 8), + byte(len(f.payload)), + } + if _, err := w.Write(hdr[:]); err != nil { + return err + } + if _, err := w.Write(f.payload); err != nil { + return err + } + return w.Flush() +} + +type conn struct { + id uint32 + + wmu *sync.Mutex // held while writing & calling unreg + w *bufio.Writer + unregConn func(id uint32) // called with wmu held + + mu sync.Mutex + cond *sync.Cond + buf []byte // unread data + eof bool // remote side closed + closed bool // our side closed (with Close) +} + +var errUnsupported = errors.New("revdial: unsupported Conn operation") + +func (c *conn) SetDeadline(t time.Time) error { return errUnsupported } +func (c *conn) SetReadDeadline(t time.Time) error { return errUnsupported } +func (c *conn) SetWriteDeadline(t time.Time) error { return errUnsupported } +func (c *conn) LocalAddr() net.Addr { return fakeAddr{} } +func (c *conn) RemoteAddr() net.Addr { return fakeAddr{} } + +func (c *conn) Close() error { + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil + } + c.closed = true + c.cond.Signal() + c.mu.Unlock() + + c.wmu.Lock() + c.unregConn(c.id) + defer c.wmu.Unlock() + return writeFrame(c, frame{ + command: frameCloseConn, + connID: c.id, + }) +} + +func (d *Dialer) unregConn(id uint32) { + delete(d.conns, id) +} + +func (c *conn) peerWrite(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + defer c.cond.Signal() + // TODO(bradfitz): bound this, like http2's buffer/pipe code + c.buf = append(c.buf, p...) + return len(p), nil +} + +func (c *conn) peerClose() { + c.mu.Lock() + defer c.mu.Unlock() + defer c.cond.Broadcast() + c.eof = true +} + +func (c *conn) Read(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + defer c.cond.Signal() // for when writers block + for len(c.buf) == 0 && !c.eof && !c.closed { + c.cond.Wait() + } + if c.closed { + return 0, errors.New("revdial: Read on closed connection") + } + if len(c.buf) == 0 && c.eof { + return 0, io.EOF + } + n = copy(p, c.buf) + c.buf = c.buf[:copy(c.buf, c.buf[n:])] // slide down + return n, nil +} + +func (c *conn) Write(p []byte) (n int, err error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return 0, errors.New("revdial: Write on Closed conn") + } + const max = 0xffff // max chunk size + for len(p) > 0 { + chunk := p + if len(chunk) > max { + chunk = chunk[:max] + } + c.wmu.Lock() + err = writeFrame(c, frame{ + command: frameWrite, + connID: c.id, + payload: chunk, + }) + c.wmu.Unlock() + if err != nil { + return n, err + } + n += len(chunk) + p = p[len(chunk):] + } + return n, nil +} + +type frameType uint8 + +const ( + frameNewConn frameType = 'N' + frameCloseConn frameType = 'C' + frameWrite frameType = 'W' +) + +type frame struct { + command frameType + connID uint32 + payload []byte // not owned +} + +func (f frame) String() string { + p := f.payload + if len(p) > 64 { + p = p[:64] + } + return fmt.Sprintf("[frame %q conn %v, %q]", f.command, f.connID, p) +} + +// onFramer is the interface for something that can get callbacks on +// new frames being received. +type onFramer interface { + onFrame(f frame) error +} + +const debug = false + +func readFrames(br *bufio.Reader, of onFramer) error { + var hdr [7]byte + var payload bytes.Buffer + for { + _, err := io.ReadFull(br, hdr[:]) + if err != nil { + return err + } + f := frame{ + command: frameType(hdr[0]), + connID: binary.BigEndian.Uint32(hdr[1:5]), + } + paySize := binary.BigEndian.Uint16(hdr[5:7]) + if debug { + log.Printf("Read frame header: %+v (len %v)", f, paySize) + } + payload.Reset() + if paySize > 0 { + if _, err := io.CopyN(&payload, br, int64(paySize)); err != nil { + return err + } + if payload.Len() != int(paySize) { + panic("invariant") + } + } + f.payload = payload.Bytes() + if debug { + log.Printf("Read full frame: %+v (len %v)", f, paySize) + } + err = of.onFrame(f) + if debug { + log.Printf("onFrame = %v", err) + } + if err != nil { + return err + } + } +} + +func NewListener(rw *bufio.ReadWriter) *Listener { + ln := &Listener{ + connc: make(chan net.Conn, 8), // arbitrary + conns: map[uint32]*conn{}, + rw: rw, + } + go func() { + err := readFrames(rw.Reader, ln) + ln.mu.Lock() + defer ln.mu.Unlock() + if ln.closed { + return + } + if err == nil { + err = errors.New("revdial: Listener.readFrames terminated with success") + } + ln.readErr = err + for _, c := range ln.conns { + c.peerClose() + } + go ln.Close() + }() + return ln +} + +var _ net.Listener = (*Listener)(nil) + +type Listener struct { + rw *bufio.ReadWriter + connc chan net.Conn + + mu sync.Mutex // guards below, closing connc, and writing to rw + readErr error + conns map[uint32]*conn + closed bool +} + +func (ln *Listener) Accept() (net.Conn, error) { + c, ok := <-ln.connc + if !ok { + ln.mu.Lock() + err := ln.readErr + ln.mu.Unlock() + if err != nil { + return nil, fmt.Errorf("revdial: Listener closed; %v", err) + } + return nil, errors.New("revdial: Listener closed") + } + return c, nil +} + +func (ln *Listener) Close() error { + ln.mu.Lock() + defer ln.mu.Unlock() + if ln.closed { + return nil + } + ln.closed = true + close(ln.connc) + return nil +} + +func (ln *Listener) Addr() net.Addr { return fakeAddr{} } + +func (ln *Listener) closeConn(id uint32) error { + ln.mu.Lock() + defer ln.mu.Unlock() + c, ok := ln.conns[id] + if !ok { + return nil + } + c.peerClose() + delete(ln.conns, id) + return nil +} + +func (ln *Listener) newConn(id uint32) error { + ln.mu.Lock() + if _, dup := ln.conns[id]; dup { + ln.mu.Unlock() + return errors.New("revdial: peer newConn with already-open connID") + } + c := &conn{ + id: id, + wmu: &ln.mu, + w: ln.rw.Writer, + unregConn: ln.unregConn, + } + c.cond = sync.NewCond(&c.mu) + ln.conns[id] = c + ln.mu.Unlock() + ln.connc <- c + return nil +} + +func (ln *Listener) unregConn(id uint32) { + // Do nothing, unlike the outbound side. +} + +func (ln *Listener) conn(id uint32) (*conn, error) { + ln.mu.Lock() + defer ln.mu.Unlock() + c, ok := ln.conns[id] + if !ok { + return nil, fmt.Errorf("revdial.Listener saw reference to unknown conn %v", id) + } + return c, nil +} + +func (ln *Listener) onFrame(f frame) error { + switch f.command { + case frameNewConn: + return ln.newConn(f.connID) + case frameCloseConn: + return ln.closeConn(f.connID) + case frameWrite: + c, err := ln.conn(f.connID) + if err != nil { + // Ignore writes on bogus conn IDs; assume it + // just recently closed. + return nil + } + if _, err := c.peerWrite(f.payload); err != nil { + c.mu.Lock() + closed := c.closed + c.mu.Unlock() + if closed { + // Conn is now closed. Assume error + // was "io: read/write on closed pipe" + // and it was just data in-flight + // while this side closed. So, don't abort + // the frame-reading loop. + return nil + } + return err + } + default: + // Ignore unknown frame types. + } + return nil +} + +type fakeAddr struct{} + +func (fakeAddr) Network() string { return "revdial" } +func (fakeAddr) String() string { return "revdialconn" } diff --git a/revdial/revdial_test.go b/revdial/revdial_test.go new file mode 100644 index 0000000000..1b75faaa70 --- /dev/null +++ b/revdial/revdial_test.go @@ -0,0 +1,189 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package revdial + +import ( + "bufio" + "bytes" + "io" + "io/ioutil" + "net" + "sync" + "testing" +) + +func TestDialer(t *testing.T) { + pr, pw := io.Pipe() + var out bytes.Buffer + d := NewDialer(bufio.NewReadWriter( + bufio.NewReader(pr), + bufio.NewWriter(&out), + ), ioutil.NopCloser(nil)) + + c, err := d.Dial() + if err != nil { + t.Fatal(err) + } + if c.(*conn).id != 1 { + t.Fatalf("first id = %d; want 1", c.(*conn).id) + } + c.Close() // to verify incoming write frames don't block + + c, err = d.Dial() + if err != nil { + t.Fatal(err) + } + if c.(*conn).id != 2 { + t.Fatalf("second id = %d; want 2", c.(*conn).id) + } + + if g, w := len(d.conns), 1; g != w { + t.Errorf("size of conns map after dial+close+dial = %v; want %v", g, w) + } + + go func() { + // Write "b" and then "ar", and read it as "bar" + pw.Write([]byte{byte(frameWrite), 0, 0, 0, 2, 0, 1, 'b'}) + pw.Write([]byte{byte(frameWrite), 0, 0, 0, 1, 0, 1, 'x'}) // verify doesn't block first conn + pw.Write([]byte{byte(frameWrite), 0, 0, 0, 2, 0, 2, 'a', 'r'}) + }() + buf := make([]byte, 3) + if n, err := io.ReadFull(c, buf); err != nil { + t.Fatalf("ReadFul = %v (%q), %v", n, buf[:n], err) + } + if string(buf) != "bar" { + t.Fatalf("read = %q; want bar", buf) + } + if _, err := io.WriteString(c, "hello, world"); err != nil { + t.Fatal(err) + } + + got := out.String() + want := "N\x00\x00\x00\x01\x00\x00" + + "C\x00\x00\x00\x01\x00\x00" + + "N\x00\x00\x00\x02\x00\x00" + + "W\x00\x00\x00\x02\x00\fhello, world" + if got != want { + t.Errorf("Written on wire differs.\nWrote: %q\n Want: %q", got, want) + } +} + +func TestListener(t *testing.T) { + pr, pw := io.Pipe() + var out bytes.Buffer + ln := NewListener(bufio.NewReadWriter( + bufio.NewReader(pr), + bufio.NewWriter(&out), + )) + go io.WriteString(pw, "N\x00\x00\x00\x42\x00\x00") + c, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + if g, w := c.(*conn).id, uint32(0x42); g != w { + t.Errorf("conn id = %d; want %d", g, w) + } + go func() { + io.WriteString(pw, "W\x00\x00\x00\x42\x00\x03"+"foo") + io.WriteString(pw, "W\x00\x00\x00\x42\x00\x03"+"bar") + io.WriteString(pw, "C\x00\x00\x00\x42\x00\x00") + }() + slurp, err := ioutil.ReadAll(c) + if g, w := string(slurp), "foobar"; g != w { + t.Errorf("Read %q; want %q", g, w) + } + if err != nil { + t.Errorf("Read = %v", err) + } + ln.Close() + if _, err := ln.Accept(); err == nil { + t.Fatalf("Accept after Closed returned nil error; want error") + } + + io.WriteString(c, "first write") + io.WriteString(c, "second write") + c.Close() + got := out.String() + want := "W\x00\x00\x00B\x00\vfirst write" + + "W\x00\x00\x00B\x00\fsecond write" + + "C\x00\x00\x00B\x00\x00" + if got != want { + t.Errorf("Wrote: %q\n Want: %q", got, want) + } +} + +func TestInterop(t *testing.T) { + var na, nb net.Conn + if true { + tln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer tln.Close() + na, err = net.Dial("tcp", tln.Addr().String()) + if err != nil { + t.Fatal(err) + } + nb, err = tln.Accept() + if err != nil { + t.Fatal(err) + } + } else { + // TODO(bradfitz): also run this way + na, nb = net.Pipe() + } + defer na.Close() + defer nb.Close() + ln := NewListener(bufio.NewReadWriter( + bufio.NewReader(na), + bufio.NewWriter(na), + )) + defer ln.Close() + d := NewDialer(bufio.NewReadWriter( + bufio.NewReader(nb), + bufio.NewWriter(nb), + ), ioutil.NopCloser(nil)) + defer d.Close() + + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c, err := d.Dial() + if err != nil { + t.Errorf("Dial: %v", err) + return + } + defer c.Close() + sc, err := ln.Accept() + if err != nil { + t.Errorf("Accept: %v", err) + return + } + defer sc.Close() + const cmsg = "Some client message" + const smsg = "Some server message" + io.WriteString(c, cmsg) // TODO(bradfitz): why the 3/500 failure rate when these are "go io.WriteString"? + io.WriteString(sc, smsg) + buf := make([]byte, len(cmsg)) + if n, err := io.ReadFull(c, buf); err != nil { + t.Errorf("reading from client conn: (%d %q, %v)", n, buf[:n], err) + return + } + if string(buf) != smsg { + t.Errorf("client read %q; want %q", buf, smsg) + } + if _, err := io.ReadFull(sc, buf); err != nil { + t.Errorf("reading from server conn: %v", err) + return + } + if string(buf) != cmsg { + t.Errorf("server read %q; want %q", buf, cmsg) + } + }() + } + wg.Wait() +}