From 8dd7c4f9975a9e3ea246c02ac2e3a112f48f6260 Mon Sep 17 00:00:00 2001 From: Adam Langley Date: Mon, 22 May 2023 20:07:03 -0400 Subject: [PATCH] acvptool: clean up better. The Close() method of the middle often wasn't getting called because `os.Exit(0)` was used in some places. Once that's fixed, it's clear that the queue of pending reads needed to be closed before waiting for the reader goroutine to finish. Lastly, don't bother trying to record the error that the reader saw: just panic the process if the modulewrapper dies during processing. Change-Id: Icf077cefd0ace2ef721a493f99fede6269531257 Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/60045 Commit-Queue: David Benjamin Auto-Submit: Adam Langley Reviewed-by: David Benjamin (cherry picked from commit cf3851c6c9380368373ac127cde1f4aa7159fba3) --- util/fipstools/acvp/acvptool/acvp.go | 6 +- .../acvp/acvptool/subprocess/subprocess.go | 188 ++++++++++++++++-- 2 files changed, 174 insertions(+), 20 deletions(-) diff --git a/util/fipstools/acvp/acvptool/acvp.go b/util/fipstools/acvp/acvptool/acvp.go index 2e1883d5699..71452341164 100644 --- a/util/fipstools/acvp/acvptool/acvp.go +++ b/util/fipstools/acvp/acvptool/acvp.go @@ -570,14 +570,14 @@ func main() { } os.Stdout.Write(regcapBytes) os.Stdout.WriteString("\n") - os.Exit(0) + return } if len(*jsonInputFile) > 0 { if err := processFile(*jsonInputFile, supportedAlgos, middle); err != nil { log.Fatalf("failed to process input file: %s", err) } - os.Exit(0) + return } var config Config @@ -783,7 +783,7 @@ func main() { if len(*fetchFlag) > 0 { io.WriteString(fetchOutputTee, "]\n") - os.Exit(0) + return } if ok, err := getResultsWithRetry(server, url); err != nil { diff --git a/util/fipstools/acvp/acvptool/subprocess/subprocess.go b/util/fipstools/acvp/acvptool/subprocess/subprocess.go index c4a37b2eec8..f1cb5fa885c 100644 --- a/util/fipstools/acvp/acvptool/subprocess/subprocess.go +++ b/util/fipstools/acvp/acvptool/subprocess/subprocess.go @@ -30,6 +30,9 @@ import ( // that don't call a server. type Transactable interface { Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error) + TransactAsync(cmd string, expectedResults int, args [][]byte, callback func([][]byte) error) + Barrier(callback func()) error + Flush() error } // Subprocess is a "middle" layer that interacts with a FIPS module via running @@ -39,6 +42,24 @@ type Subprocess struct { stdin io.WriteCloser stdout io.ReadCloser primitives map[string]primitive + // supportsFlush is true if the modulewrapper indicated that it wants to receive flush commands. + supportsFlush bool + // pendingReads is a queue of expected responses. `readerRoutine` reads each response and calls the callback in the matching pendingRead. + pendingReads chan pendingRead + // readerFinished is a channel that is closed if `readerRoutine` has finished (e.g. because of a read error). + readerFinished chan struct{} +} + +// pendingRead represents an expected response from the modulewrapper. +type pendingRead struct { + // barrierCallback is called as soon as this pendingRead is the next in the queue, before any read from the modulewrapper. + barrierCallback func() + + // callback is called with the result from the modulewrapper. If this is nil then no read is performed. + callback func(result [][]byte) error + // cmd is the command that requested this read for logging purposes. + cmd string + expectedNumResults int } // New returns a new Subprocess middle layer that runs the given binary. @@ -61,13 +82,18 @@ func New(path string) (*Subprocess, error) { return NewWithIO(cmd, stdin, stdout), nil } +// maxPending is the maximum number of requests that can be in the pipeline. +const maxPending = 4096 + // NewWithIO returns a new Subprocess middle layer with the given ReadCloser and // WriteCloser. The returned Subprocess will call Wait on the Cmd when closed. func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess { m := &Subprocess{ - cmd: cmd, - stdin: in, - stdout: out, + cmd: cmd, + stdin: in, + stdout: out, + pendingReads: make(chan pendingRead, maxPending), + readerFinished: make(chan struct{}), } m.primitives = map[string]primitive{ @@ -107,16 +133,16 @@ func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess "hmacDRBG": &drbg{"hmacDRBG", map[string]bool{"SHA-1": true, "SHA2-224": true, "SHA2-256": true, "SHA2-384": true, "SHA2-512": true}}, "KDF": &kdfPrimitive{}, "KDA": &hkdf{}, + "TLS-v1.2": &tlsKDF{}, + "TLS-v1.3": &tls13{}, "CMAC-AES": &keyedMACPrimitive{"CMAC-AES"}, "RSA": &rsa{}, - "kdf-components": &kdfComp{"kdf-components"}, - "TLS-v1.2": &kdfComp{"TLS-v1.2"}, "KAS-ECC-SSC": &kas{}, "KAS-FFC-SSC": &kasDH{}, - "PBKDF": &pbkdf{}, } m.primitives["ECDSA"] = &ecdsa{"ECDSA", map[string]bool{"P-224": true, "P-256": true, "P-384": true, "P-521": true}, m.primitives} + go m.readerRoutine() return m } @@ -125,10 +151,58 @@ func (m *Subprocess) Close() { m.stdout.Close() m.stdin.Close() m.cmd.Wait() + close(m.pendingReads) + <-m.readerFinished +} + +func (m *Subprocess) flush() error { + if !m.supportsFlush { + return nil + } + + const cmd = "flush" + buf := make([]byte, 8, 8+len(cmd)) + binary.LittleEndian.PutUint32(buf, 1) + binary.LittleEndian.PutUint32(buf[4:], uint32(len(cmd))) + buf = append(buf, []byte(cmd)...) + + if _, err := m.stdin.Write(buf); err != nil { + return err + } + return nil } -// Transact performs a single request--response pair with the subprocess. -func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error) { +func (m *Subprocess) enqueueRead(pending pendingRead) error { + select { + case <-m.readerFinished: + panic("attempted to enqueue request after the reader failed") + default: + } + + select { + case m.pendingReads <- pending: + break + default: + // `pendingReads` is full. Ensure that the modulewrapper will process + // some outstanding requests to free up space in the queue. + if err := m.flush(); err != nil { + return err + } + m.pendingReads <- pending + } + + return nil +} + +// TransactAsync performs a single request--response pair with the subprocess. +// The callback will run at some future point, in a separate goroutine. All +// callbacks will, however, be run in the order that TransactAsync was called. +// Use Flush to wait for all outstanding callbacks. +func (m *Subprocess) TransactAsync(cmd string, expectedNumResults int, args [][]byte, callback func(result [][]byte) error) { + if err := m.enqueueRead(pendingRead{nil, callback, cmd, expectedNumResults}); err != nil { + panic(err) + } + argLength := len(cmd) for _, arg := range args { argLength += len(arg) @@ -146,22 +220,93 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ( } if _, err := m.stdin.Write(buf); err != nil { - return nil, fmt.Errorf("Failed to write buff: %s", err) + panic(err) + } +} + +// Flush tells the subprocess to complete all outstanding requests and waits +// for all outstanding TransactAsync callbacks to complete. +func (m *Subprocess) Flush() error { + if m.supportsFlush { + m.flush() + } + + done := make(chan struct{}) + if err := m.enqueueRead(pendingRead{barrierCallback: func() { + close(done) + }}); err != nil { + return err } - buf = buf[:4] + <-done + return nil +} + +// Barrier runs callback after all outstanding TransactAsync callbacks have +// been run. +func (m *Subprocess) Barrier(callback func()) error { + return m.enqueueRead(pendingRead{barrierCallback: callback}) +} + +func (m *Subprocess) Transact(cmd string, expectedNumResults int, args ...[]byte) ([][]byte, error) { + done := make(chan struct{}) + var result [][]byte + m.TransactAsync(cmd, expectedNumResults, args, func(r [][]byte) error { + result = r + close(done) + return nil + }) + + if err := m.flush(); err != nil { + return nil, err + } + + select { + case <-done: + return result, nil + case <-m.readerFinished: + panic("was still waiting for a result when the reader finished") + } +} + +func (m *Subprocess) readerRoutine() { + defer close(m.readerFinished) + + for pendingRead := range m.pendingReads { + if pendingRead.barrierCallback != nil { + pendingRead.barrierCallback() + } + + if pendingRead.callback == nil { + continue + } + + result, err := m.readResult(pendingRead.cmd, pendingRead.expectedNumResults) + if err != nil { + panic(fmt.Errorf("failed to read from subprocess: %w", err)) + } + + if err := pendingRead.callback(result); err != nil { + panic(fmt.Errorf("result from subprocess was rejected: %w", err)) + } + } +} + +func (m *Subprocess) readResult(cmd string, expectedNumResults int) ([][]byte, error) { + buf := make([]byte, 4) + if _, err := io.ReadFull(m.stdout, buf); err != nil { - return nil, fmt.Errorf("Failed to read the length of sections section: %s", err) + return nil, err } numResults := binary.LittleEndian.Uint32(buf) - if int(numResults) != expectedResults { - return nil, fmt.Errorf("expected %d results from %q but got %d", expectedResults, cmd, numResults) + if int(numResults) != expectedNumResults { + return nil, fmt.Errorf("expected %d results from %q but got %d", expectedNumResults, cmd, numResults) } buf = make([]byte, 4*numResults) if _, err := io.ReadFull(m.stdout, buf); err != nil { - return nil, fmt.Errorf("Failed to read the length of each section: %s", err) + return nil, err } var resultsLength uint64 @@ -175,7 +320,7 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ( results := make([]byte, resultsLength) if _, err := io.ReadFull(m.stdout, results); err != nil { - return nil, fmt.Errorf("Failed to read total results: %s", err) + return nil, err } ret := make([][]byte, 0, numResults) @@ -198,16 +343,25 @@ func (m *Subprocess) Config() ([]byte, error) { return nil, err } var config []struct { - Algorithm string `json:"algorithm"` + Algorithm string `json:"algorithm"` + Features []string `json:"features"` } if err := json.Unmarshal(results[0], &config); err != nil { return nil, errors.New("failed to parse config response from wrapper: " + err.Error()) } for _, algo := range config { - if _, ok := m.primitives[algo.Algorithm]; !ok { + if algo.Algorithm == "acvptool" { + for _, feature := range algo.Features { + switch feature { + case "batch": + m.supportsFlush = true + } + } + } else if _, ok := m.primitives[algo.Algorithm]; !ok { return nil, fmt.Errorf("wrapper config advertises support for unknown algorithm %q", algo.Algorithm) } } + return results[0], nil }