Skip to content

Commit

Permalink
acvptool: clean up better.
Browse files Browse the repository at this point in the history
The Close() method of the middle often wasn't getting called because
`os.Exit(0)` was used in some places. Once that's fixed, it's clear that
the queue of pending reads needed to be closed before waiting for the
reader goroutine to finish. Lastly, don't bother trying to record the
error that the reader saw: just panic the process if the modulewrapper
dies during processing.

Change-Id: Icf077cefd0ace2ef721a493f99fede6269531257
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/60045
Commit-Queue: David Benjamin <[email protected]>
Auto-Submit: Adam Langley <[email protected]>
Reviewed-by: David Benjamin <[email protected]>

(cherry picked from commit cf3851c6c9380368373ac127cde1f4aa7159fba3)
  • Loading branch information
agl authored and justsmth committed Aug 14, 2023
1 parent bdb3f08 commit 8dd7c4f
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 20 deletions.
6 changes: 3 additions & 3 deletions util/fipstools/acvp/acvptool/acvp.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,14 @@ func main() {
}
os.Stdout.Write(regcapBytes)
os.Stdout.WriteString("\n")
os.Exit(0)
return
}

if len(*jsonInputFile) > 0 {
if err := processFile(*jsonInputFile, supportedAlgos, middle); err != nil {
log.Fatalf("failed to process input file: %s", err)
}
os.Exit(0)
return
}

var config Config
Expand Down Expand Up @@ -783,7 +783,7 @@ func main() {

if len(*fetchFlag) > 0 {
io.WriteString(fetchOutputTee, "]\n")
os.Exit(0)
return
}

if ok, err := getResultsWithRetry(server, url); err != nil {
Expand Down
188 changes: 171 additions & 17 deletions util/fipstools/acvp/acvptool/subprocess/subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
// that don't call a server.
type Transactable interface {
Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error)
TransactAsync(cmd string, expectedResults int, args [][]byte, callback func([][]byte) error)
Barrier(callback func()) error
Flush() error
}

// Subprocess is a "middle" layer that interacts with a FIPS module via running
Expand All @@ -39,6 +42,24 @@ type Subprocess struct {
stdin io.WriteCloser
stdout io.ReadCloser
primitives map[string]primitive
// supportsFlush is true if the modulewrapper indicated that it wants to receive flush commands.
supportsFlush bool
// pendingReads is a queue of expected responses. `readerRoutine` reads each response and calls the callback in the matching pendingRead.
pendingReads chan pendingRead
// readerFinished is a channel that is closed if `readerRoutine` has finished (e.g. because of a read error).
readerFinished chan struct{}
}

// pendingRead represents an expected response from the modulewrapper.
type pendingRead struct {
// barrierCallback is called as soon as this pendingRead is the next in the queue, before any read from the modulewrapper.
barrierCallback func()

// callback is called with the result from the modulewrapper. If this is nil then no read is performed.
callback func(result [][]byte) error
// cmd is the command that requested this read for logging purposes.
cmd string
expectedNumResults int
}

// New returns a new Subprocess middle layer that runs the given binary.
Expand All @@ -61,13 +82,18 @@ func New(path string) (*Subprocess, error) {
return NewWithIO(cmd, stdin, stdout), nil
}

// maxPending is the maximum number of requests that can be in the pipeline.
const maxPending = 4096

// NewWithIO returns a new Subprocess middle layer with the given ReadCloser and
// WriteCloser. The returned Subprocess will call Wait on the Cmd when closed.
func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess {
m := &Subprocess{
cmd: cmd,
stdin: in,
stdout: out,
cmd: cmd,
stdin: in,
stdout: out,
pendingReads: make(chan pendingRead, maxPending),
readerFinished: make(chan struct{}),
}

m.primitives = map[string]primitive{
Expand Down Expand Up @@ -107,16 +133,16 @@ func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess
"hmacDRBG": &drbg{"hmacDRBG", map[string]bool{"SHA-1": true, "SHA2-224": true, "SHA2-256": true, "SHA2-384": true, "SHA2-512": true}},
"KDF": &kdfPrimitive{},
"KDA": &hkdf{},
"TLS-v1.2": &tlsKDF{},
"TLS-v1.3": &tls13{},
"CMAC-AES": &keyedMACPrimitive{"CMAC-AES"},
"RSA": &rsa{},
"kdf-components": &kdfComp{"kdf-components"},
"TLS-v1.2": &kdfComp{"TLS-v1.2"},
"KAS-ECC-SSC": &kas{},
"KAS-FFC-SSC": &kasDH{},
"PBKDF": &pbkdf{},
}
m.primitives["ECDSA"] = &ecdsa{"ECDSA", map[string]bool{"P-224": true, "P-256": true, "P-384": true, "P-521": true}, m.primitives}

go m.readerRoutine()
return m
}

Expand All @@ -125,10 +151,58 @@ func (m *Subprocess) Close() {
m.stdout.Close()
m.stdin.Close()
m.cmd.Wait()
close(m.pendingReads)
<-m.readerFinished
}

func (m *Subprocess) flush() error {
if !m.supportsFlush {
return nil
}

const cmd = "flush"
buf := make([]byte, 8, 8+len(cmd))
binary.LittleEndian.PutUint32(buf, 1)
binary.LittleEndian.PutUint32(buf[4:], uint32(len(cmd)))
buf = append(buf, []byte(cmd)...)

if _, err := m.stdin.Write(buf); err != nil {
return err
}
return nil
}

// Transact performs a single request--response pair with the subprocess.
func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error) {
func (m *Subprocess) enqueueRead(pending pendingRead) error {
select {
case <-m.readerFinished:
panic("attempted to enqueue request after the reader failed")
default:
}

select {
case m.pendingReads <- pending:
break
default:
// `pendingReads` is full. Ensure that the modulewrapper will process
// some outstanding requests to free up space in the queue.
if err := m.flush(); err != nil {
return err
}
m.pendingReads <- pending
}

return nil
}

// TransactAsync performs a single request--response pair with the subprocess.
// The callback will run at some future point, in a separate goroutine. All
// callbacks will, however, be run in the order that TransactAsync was called.
// Use Flush to wait for all outstanding callbacks.
func (m *Subprocess) TransactAsync(cmd string, expectedNumResults int, args [][]byte, callback func(result [][]byte) error) {
if err := m.enqueueRead(pendingRead{nil, callback, cmd, expectedNumResults}); err != nil {
panic(err)
}

argLength := len(cmd)
for _, arg := range args {
argLength += len(arg)
Expand All @@ -146,22 +220,93 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) (
}

if _, err := m.stdin.Write(buf); err != nil {
return nil, fmt.Errorf("Failed to write buff: %s", err)
panic(err)
}
}

// Flush tells the subprocess to complete all outstanding requests and waits
// for all outstanding TransactAsync callbacks to complete.
func (m *Subprocess) Flush() error {
if m.supportsFlush {
m.flush()
}

done := make(chan struct{})
if err := m.enqueueRead(pendingRead{barrierCallback: func() {
close(done)
}}); err != nil {
return err
}

buf = buf[:4]
<-done
return nil
}

// Barrier runs callback after all outstanding TransactAsync callbacks have
// been run.
func (m *Subprocess) Barrier(callback func()) error {
return m.enqueueRead(pendingRead{barrierCallback: callback})
}

func (m *Subprocess) Transact(cmd string, expectedNumResults int, args ...[]byte) ([][]byte, error) {
done := make(chan struct{})
var result [][]byte
m.TransactAsync(cmd, expectedNumResults, args, func(r [][]byte) error {
result = r
close(done)
return nil
})

if err := m.flush(); err != nil {
return nil, err
}

select {
case <-done:
return result, nil
case <-m.readerFinished:
panic("was still waiting for a result when the reader finished")
}
}

func (m *Subprocess) readerRoutine() {
defer close(m.readerFinished)

for pendingRead := range m.pendingReads {
if pendingRead.barrierCallback != nil {
pendingRead.barrierCallback()
}

if pendingRead.callback == nil {
continue
}

result, err := m.readResult(pendingRead.cmd, pendingRead.expectedNumResults)
if err != nil {
panic(fmt.Errorf("failed to read from subprocess: %w", err))
}

if err := pendingRead.callback(result); err != nil {
panic(fmt.Errorf("result from subprocess was rejected: %w", err))
}
}
}

func (m *Subprocess) readResult(cmd string, expectedNumResults int) ([][]byte, error) {
buf := make([]byte, 4)

if _, err := io.ReadFull(m.stdout, buf); err != nil {
return nil, fmt.Errorf("Failed to read the length of sections section: %s", err)
return nil, err
}

numResults := binary.LittleEndian.Uint32(buf)
if int(numResults) != expectedResults {
return nil, fmt.Errorf("expected %d results from %q but got %d", expectedResults, cmd, numResults)
if int(numResults) != expectedNumResults {
return nil, fmt.Errorf("expected %d results from %q but got %d", expectedNumResults, cmd, numResults)
}

buf = make([]byte, 4*numResults)
if _, err := io.ReadFull(m.stdout, buf); err != nil {
return nil, fmt.Errorf("Failed to read the length of each section: %s", err)
return nil, err
}

var resultsLength uint64
Expand All @@ -175,7 +320,7 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) (

results := make([]byte, resultsLength)
if _, err := io.ReadFull(m.stdout, results); err != nil {
return nil, fmt.Errorf("Failed to read total results: %s", err)
return nil, err
}

ret := make([][]byte, 0, numResults)
Expand All @@ -198,16 +343,25 @@ func (m *Subprocess) Config() ([]byte, error) {
return nil, err
}
var config []struct {
Algorithm string `json:"algorithm"`
Algorithm string `json:"algorithm"`
Features []string `json:"features"`
}
if err := json.Unmarshal(results[0], &config); err != nil {
return nil, errors.New("failed to parse config response from wrapper: " + err.Error())
}
for _, algo := range config {
if _, ok := m.primitives[algo.Algorithm]; !ok {
if algo.Algorithm == "acvptool" {
for _, feature := range algo.Features {
switch feature {
case "batch":
m.supportsFlush = true
}
}
} else if _, ok := m.primitives[algo.Algorithm]; !ok {
return nil, fmt.Errorf("wrapper config advertises support for unknown algorithm %q", algo.Algorithm)
}
}

return results[0], nil
}

Expand Down

0 comments on commit 8dd7c4f

Please sign in to comment.