Skip to content

Commit

Permalink
🌱 addr.Suggest should lock a file instead of memory
Browse files Browse the repository at this point in the history
Envtest is often running in parallel when using go test, which spins up
multiple indipendent go test processes that cannot talk to each other.
The address suggestion code, mostly used to find an open port, can
cause port collisions and a race condition between different envtests
running at the same time.

This change switches the internal memory to use a file based system that
creates a file.

Signed-off-by: Vince Prignano <[email protected]>
  • Loading branch information
vincepri committed Jun 23, 2021
1 parent 7d83250 commit 58c17f6
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 46 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/prometheus/client_model v0.2.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
gomodules.xyz/jsonpatch/v2 v2.2.0
google.golang.org/appengine v1.6.7 // indirect
Expand Down
10 changes: 6 additions & 4 deletions hack/check-everything.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ source ${hack_dir}/common.sh
tmp_root=/tmp
kb_root_dir=$tmp_root/kubebuilder

ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"}
# Run verification scripts.
${hack_dir}/verify.sh

# set up envtest tools if necessary
# Envtest.
ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION:-"1.21.2"}

header_text "installing envtest tools@${ENVTEST_K8S_VERSION} with setup-envtest if necessary"
tmp_bin=/tmp/cr-tests-bin
Expand All @@ -35,9 +37,9 @@ tmp_bin=/tmp/cr-tests-bin
cd ${hack_dir}/../tools/setup-envtest
GOBIN=${tmp_bin} go install .
)
source <(${tmp_bin}/setup-envtest use --use-env -p env ${ENVTEST_K8S_VERSION})
export KUBEBUILDER_ASSETS="$(${tmp_bin}/setup-envtest use --use-env -p path "${ENVTEST_K8S_VERSION}")"

${hack_dir}/verify.sh
# Run tests.
${hack_dir}/test-all.sh

header_text "confirming examples compile (via go install)"
Expand Down
8 changes: 6 additions & 2 deletions hack/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@ make generate
header_text "running golangci-lint"
make lint

header_text "verifying modules"
make modules verify-modules
# Only run module verification in CI, otherwise updating
# go module locally (which is a valid operation) causes `make test` to fail.
if [[ -n ${CI} ]]; then
header_text "verifying modules"
make modules verify-modules
fi
21 changes: 21 additions & 0 deletions pkg/internal/flock/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package flock is copied from k8s.io/kubernetes/pkg/util/flock to avoid
// importing k8s.io/kubernetes as a dependency.
//
// Provides file locking functionalities on unix systems.
package flock
24 changes: 24 additions & 0 deletions pkg/internal/flock/flock_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// +build !linux,!darwin,!freebsd,!openbsd,!netbsd,!dragonfly

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flock

// Acquire is not implemented on non-unix systems.
func Acquire(path string) error {
return nil
}
35 changes: 35 additions & 0 deletions pkg/internal/flock/flock_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// +build linux darwin freebsd openbsd netbsd dragonfly

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flock

import "golang.org/x/sys/unix"

// Acquire acquires a lock on a file for the duration of the process. This method
// is reentrant.
func Acquire(path string) error {
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
if err != nil {
return err
}

// We don't need to close the fd since we should hold
// it until the process exits.

return unix.Flock(fd, unix.LOCK_EX)
}
103 changes: 67 additions & 36 deletions pkg/internal/testing/addr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,109 @@ package addr

import (
"fmt"
"io/fs"
"net"
"sync"
"os"
"path/filepath"
"strings"
"time"

"sigs.k8s.io/controller-runtime/pkg/internal/flock"
)

// TODO(directxman12): interface / release functionality for external port managers

const (
portReserveTime = 1 * time.Minute
portReserveTime = 10 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)

var (
cacheDir string
)

type portCache struct {
lock sync.Mutex
ports map[int]time.Time
func init() {
baseDir, err := os.UserCacheDir()
if err != nil {
baseDir = os.TempDir()
}
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
if err := os.MkdirAll(cacheDir, 0750); err != nil {
panic(err)
}
}

func (c *portCache) add(port int) bool {
c.lock.Lock()
defer c.lock.Unlock()
// remove outdated port
for p, t := range c.ports {
if time.Since(t) > portReserveTime {
delete(c.ports, p)
type portCache struct{}

func (c *portCache) add(port int) (bool, error) {
// Remove outdated ports.
if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
return nil
}
info, err := d.Info()
if err != nil {
return err
}
if time.Since(info.ModTime()) > portReserveTime {
if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
return err
}
}
return nil
}); err != nil {
return false, err
}
// try allocating new port
if _, ok := c.ports[port]; ok {
return false
// Try allocating new port, by acquiring a file.
if err := flock.Acquire(fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)); os.IsExist(err) {
return false, nil
} else if err != nil {
return false, err
}
c.ports[port] = time.Now()
return true
return true, nil
}

var cache = &portCache{
ports: make(map[int]time.Time),
}
var cache = &portCache{}

func suggest(listenHost string) (port int, resolvedHost string, err error) {
func suggest(listenHost string) (int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return
return -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return
return -1, "", err
}
if err := l.Close(); err != nil {
return -1, "", err
}
port = l.Addr().(*net.TCPAddr).Port
defer func() {
err = l.Close()
}()
resolvedHost = addr.IP.String()
return
return l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}

// Suggest suggests an address a process can listen on. It returns
// a tuple consisting of a free port and the hostname resolved to its IP.
// It makes sure that new port allocated does not conflict with old ports
// allocated within 1 minute.
func Suggest(listenHost string) (port int, resolvedHost string, err error) {
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
port, resolvedHost, err = suggest(listenHost)
port, resolvedHost, err := suggest(listenHost)
if err != nil {
return
return -1, "", err
}
if cache.add(port) {
return
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
return -1, "", err
}
}
err = fmt.Errorf("no free ports found after %d retries", portConflictRetry)
return
return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
}
10 changes: 6 additions & 4 deletions pkg/internal/testing/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh
// Stop stops this process gracefully, waits for its termination, and cleans up
// the CertDir if necessary.
func (ps *State) Stop() error {
// Always clear the directory if we need to.
defer func() {
if ps.DirNeedsCleaning {
_ = os.RemoveAll(ps.Dir)
}
}()
if ps.Cmd == nil {
return nil
}
Expand All @@ -267,9 +273,5 @@ func (ps *State) Stop() error {
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
}
ps.ready = false
if ps.DirNeedsCleaning {
return os.RemoveAll(ps.Dir)
}

return nil
}

0 comments on commit 58c17f6

Please sign in to comment.