Skip to content

Commit

Permalink
Update layer store to sync transaction files before committing
Browse files Browse the repository at this point in the history
Fixes case where shutdown occurs before content is synced to disked
on layer creation. This case can leave the layer store in an bad
state and require manual recovery. This change ensures all files
are synced to disk before a layer is committed. Any shutdown that
occurs will only cause the layer to not show up but will allow it to
be repulled or recreated without error.

Added generic io logic to ioutils package to abstract it out of
the layer store package.


Signed-off-by: Derek McGowan <[email protected]>
  • Loading branch information
dmcgowan committed Aug 9, 2016
1 parent 2684459 commit c37bd10
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 16 deletions.
28 changes: 14 additions & 14 deletions layer/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type fileMetadataStore struct {

type fileMetadataTransaction struct {
store *fileMetadataStore
root string
ws *ioutils.AtomicWriteSet
}

// NewFSMetadataStore returns an instance of a metadata store
Expand Down Expand Up @@ -71,45 +71,44 @@ func (fms *fileMetadataStore) StartTransaction() (MetadataTransaction, error) {
if err := os.MkdirAll(tmpDir, 0755); err != nil {
return nil, err
}

td, err := ioutil.TempDir(tmpDir, "layer-")
ws, err := ioutils.NewAtomicWriteSet(tmpDir)
if err != nil {
return nil, err
}
// Create a new tempdir

return &fileMetadataTransaction{
store: fms,
root: td,
ws: ws,
}, nil
}

func (fm *fileMetadataTransaction) SetSize(size int64) error {
content := fmt.Sprintf("%d", size)
return ioutil.WriteFile(filepath.Join(fm.root, "size"), []byte(content), 0644)
return fm.ws.WriteFile("size", []byte(content), 0644)
}

func (fm *fileMetadataTransaction) SetParent(parent ChainID) error {
return ioutil.WriteFile(filepath.Join(fm.root, "parent"), []byte(digest.Digest(parent).String()), 0644)
return fm.ws.WriteFile("parent", []byte(digest.Digest(parent).String()), 0644)
}

func (fm *fileMetadataTransaction) SetDiffID(diff DiffID) error {
return ioutil.WriteFile(filepath.Join(fm.root, "diff"), []byte(digest.Digest(diff).String()), 0644)
return fm.ws.WriteFile("diff", []byte(digest.Digest(diff).String()), 0644)
}

func (fm *fileMetadataTransaction) SetCacheID(cacheID string) error {
return ioutil.WriteFile(filepath.Join(fm.root, "cache-id"), []byte(cacheID), 0644)
return fm.ws.WriteFile("cache-id", []byte(cacheID), 0644)
}

func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) error {
jsonRef, err := json.Marshal(ref)
if err != nil {
return err
}
return ioutil.WriteFile(filepath.Join(fm.root, "descriptor.json"), jsonRef, 0644)
return fm.ws.WriteFile("descriptor.json", jsonRef, 0644)
}

func (fm *fileMetadataTransaction) TarSplitWriter(compressInput bool) (io.WriteCloser, error) {
f, err := os.OpenFile(filepath.Join(fm.root, "tar-split.json.gz"), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
f, err := fm.ws.FileWriter("tar-split.json.gz", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
Expand All @@ -131,15 +130,16 @@ func (fm *fileMetadataTransaction) Commit(layer ChainID) error {
if err := os.MkdirAll(filepath.Dir(finalDir), 0755); err != nil {
return err
}
return os.Rename(fm.root, finalDir)

return fm.ws.Commit(finalDir)
}

func (fm *fileMetadataTransaction) Cancel() error {
return os.RemoveAll(fm.root)
return fm.ws.Cancel()
}

func (fm *fileMetadataTransaction) String() string {
return fm.root
return fm.ws.String()
}

func (fms *fileMetadataStore) GetSize(layer ChainID) (int64, error) {
Expand Down
80 changes: 80 additions & 0 deletions pkg/ioutils/fswriters.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,83 @@ func (w *atomicFileWriter) Close() (retErr error) {
}
return nil
}

// AtomicWriteSet is used to atomically write a set
// of files and ensure they are visible at the same time.
// Must be committed to a new directory.
type AtomicWriteSet struct {
root string
}

// NewAtomicWriteSet creates a new atomic write set to
// atomically create a set of files. The given directory
// is used as the base directory for storing files before
// commit. If no temporary directory is given the system
// default is used.
func NewAtomicWriteSet(tmpDir string) (*AtomicWriteSet, error) {
td, err := ioutil.TempDir(tmpDir, "write-set-")
if err != nil {
return nil, err
}

return &AtomicWriteSet{
root: td,
}, nil
}

// WriteFile writes a file to the set, guaranteeing the file
// has been synced.
func (ws *AtomicWriteSet) WriteFile(filename string, data []byte, perm os.FileMode) error {
f, err := ws.FileWriter(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}

type syncFileCloser struct {
*os.File
}

func (w syncFileCloser) Close() error {
err := w.File.Sync()
if err1 := w.File.Close(); err == nil {
err = err1
}
return err
}

// FileWriter opens a file writer inside the set. The file
// should be synced and closed before calling commit.
func (ws *AtomicWriteSet) FileWriter(name string, flag int, perm os.FileMode) (io.WriteCloser, error) {
f, err := os.OpenFile(filepath.Join(ws.root, name), flag, perm)
if err != nil {
return nil, err
}
return syncFileCloser{f}, nil
}

// Cancel cancels the set and removes all temporary data
// created in the set.
func (ws *AtomicWriteSet) Cancel() error {
return os.RemoveAll(ws.root)
}

// Commit moves all created files to the target directory. The
// target directory must not exist and the parent of the target
// directory must exist.
func (ws *AtomicWriteSet) Commit(target string) error {
return os.Rename(ws.root, target)
}

// String returns the location the set is writing to.
func (ws *AtomicWriteSet) String() string {
return ws.root
}
97 changes: 95 additions & 2 deletions pkg/ioutils/fswriters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
)

var (
testMode os.FileMode = 0640
)

func init() {
// Windows does not support full Linux file mode
if runtime.GOOS == "windows" {
testMode = 0666
}
}

func TestAtomicWriteToFile(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writers-test")
if err != nil {
Expand All @@ -16,7 +28,7 @@ func TestAtomicWriteToFile(t *testing.T) {
defer os.RemoveAll(tmpDir)

expected := []byte("barbaz")
if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, 0666); err != nil {
if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err)
}

Expand All @@ -33,7 +45,88 @@ func TestAtomicWriteToFile(t *testing.T) {
if err != nil {
t.Fatalf("Error statting file: %v", err)
}
if expected := os.FileMode(0666); st.Mode() != expected {
if expected := os.FileMode(testMode); st.Mode() != expected {
t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
}
}

func TestAtomicWriteSetCommit(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
if err != nil {
t.Fatalf("Error when creating temporary directory: %s", err)
}
defer os.RemoveAll(tmpDir)

if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
t.Fatalf("Error creating tmp directory: %s", err)
}

targetDir := filepath.Join(tmpDir, "target")
ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
if err != nil {
t.Fatalf("Error creating atomic write set: %s", err)
}

expected := []byte("barbaz")
if err := ws.WriteFile("foo", expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err)
}

if _, err := ioutil.ReadFile(filepath.Join(targetDir, "foo")); err == nil {
t.Fatalf("Expected error reading file where should not exist")
}

if err := ws.Commit(targetDir); err != nil {
t.Fatalf("Error committing file: %s", err)
}

actual, err := ioutil.ReadFile(filepath.Join(targetDir, "foo"))
if err != nil {
t.Fatalf("Error reading from file: %v", err)
}

if bytes.Compare(actual, expected) != 0 {
t.Fatalf("Data mismatch, expected %q, got %q", expected, actual)
}

st, err := os.Stat(filepath.Join(targetDir, "foo"))
if err != nil {
t.Fatalf("Error statting file: %v", err)
}
if expected := os.FileMode(testMode); st.Mode() != expected {
t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
}

}

func TestAtomicWriteSetCancel(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
if err != nil {
t.Fatalf("Error when creating temporary directory: %s", err)
}
defer os.RemoveAll(tmpDir)

if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
t.Fatalf("Error creating tmp directory: %s", err)
}

ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
if err != nil {
t.Fatalf("Error creating atomic write set: %s", err)
}

expected := []byte("barbaz")
if err := ws.WriteFile("foo", expected, testMode); err != nil {
t.Fatalf("Error writing to file: %v", err)
}

if err := ws.Cancel(); err != nil {
t.Fatalf("Error committing file: %s", err)
}

if _, err := ioutil.ReadFile(filepath.Join(tmpDir, "target", "foo")); err == nil {
t.Fatalf("Expected error reading file where should not exist")
} else if !os.IsNotExist(err) {
t.Fatalf("Unexpected error reading file: %s", err)
}
}

0 comments on commit c37bd10

Please sign in to comment.