Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): correct corrupted files on write #3859

Merged
merged 11 commits into from
Oct 22, 2024
36 changes: 36 additions & 0 deletions store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
return nil
}

// ValidateODSSize checks if the file under given FS path has the expected size.
func ValidateODSSize(path string, eds *rsmt2d.ExtendedDataSquare) error {
ods, err := OpenODS(path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}

shares := filledSharesAmount(eds)
shareSize := len(eds.GetCell(0, 0))
expectedSize := ods.hdr.OffsetWithRoots() + shares*shareSize

info, err := ods.fl.Stat()
if err != nil {
return fmt.Errorf("getting file info: %w", err)
}
if info.Size() != int64(expectedSize) {
return fmt.Errorf("file size mismatch: expected %d, got %d", expectedSize, info.Size())
}
return nil
}

// OpenODS opens an existing ODS file under given FS path.
// It only reads the header with metadata. The other content
// of the File is read lazily.
Expand Down Expand Up @@ -414,3 +435,18 @@ func readColHalf(r io.ReaderAt, colIdx int, hdr *headerV0, offset int) ([]share.
}
return shares, nil
}

// filledSharesAmount returns the amount of shares in the ODS that are not tail padding.
func filledSharesAmount(eds *rsmt2d.ExtendedDataSquare) int {
var amount int
for i := range eds.Width() / 2 {
for j := range eds.Width() / 2 {
shr := eds.GetCell(i, j)
if share.GetNamespace(shr).Equals(share.TailPaddingNamespace) {
break
}
amount++
}
}
return amount
}
13 changes: 13 additions & 0 deletions store/file/ods_q4.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ func CreateODSQ4(
return nil
}

// ValidateODSQ4Size checks the size of the ODS and Q4 files under the given FS paths.
func ValidateODSQ4Size(pathODS, pathQ4 string, eds *rsmt2d.ExtendedDataSquare) error {
err := ValidateODSSize(pathODS, eds)
if err != nil {
return fmt.Errorf("validating ODS file size: %w", err)
}
err = validateQ4Size(pathQ4, eds)
if err != nil {
return fmt.Errorf("validating Q4 file size: %w", err)
}
return nil
}

// ODSWithQ4 returns ODSQ4 instance over ODS. It opens Q4 file lazily under the given path.
func ODSWithQ4(ods *ODS, pathQ4 string) *ODSQ4 {
return &ODSQ4{
Expand Down
92 changes: 92 additions & 0 deletions store/file/ods_q4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package file

import (
"context"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -39,6 +42,95 @@ func TestODSQ4File(t *testing.T) {
eds.TestStreamer(ctx, t, createODSQ4AccessorStreamer, ODSSize)
}

func TestValidateODSQ4FileSize(t *testing.T) {
edses := []struct {
name string
eds *rsmt2d.ExtendedDataSquare
}{
{
name: "no padding",
eds: edstest.RandEDS(t, 8),
},
{
name: "with padding",
eds: edstest.RandEDSWithTailPadding(t, 8, 11),
},
{
name: "empty", eds: share.EmptyEDS(),
},
}

tests := []struct {
name string
createFile func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error
valid bool
}{
{
name: "valid",
createFile: CreateODSQ4,
valid: true,
},
{
name: "shorter q4",
createFile: func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODSQ4(pathODS, pathQ4, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(pathQ4, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
return file.Truncate(info.Size() - 1)
},
valid: false,
},
{
name: "longer q4",
createFile: func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODSQ4(pathODS, pathQ4, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(pathQ4, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
// append 1 byte to the file
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
_, err = file.Write([]byte{0})
return err
},
valid: false,
},
}

for _, tt := range tests {
for _, eds := range edses {
t.Run(fmt.Sprintf("%s/%s", tt.name, eds.name), func(t *testing.T) {
pathODS := t.TempDir() + tt.name + eds.name
pathQ4 := pathODS + ".q4"
roots, err := share.NewAxisRoots(eds.eds)
require.NoError(t, err)
err = tt.createFile(pathODS, pathQ4, roots, eds.eds)
require.NoError(t, err)

err = ValidateODSQ4Size(pathODS, pathQ4, eds.eds)
require.Equal(t, tt.valid, err == nil)
})
}
}
}

// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:0-16 354836 3345 ns/op
// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:1-16 339547 3187 ns/op
// BenchmarkAxisFromODSQ4File/Size:32/ProofType:col/squareHalf:0-16 69364 16440 ns/op
Expand Down
91 changes: 91 additions & 0 deletions store/file/ods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package file

import (
"context"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +64,94 @@ func TestODSFile(t *testing.T) {
eds.TestStreamer(ctx, t, createODSAccessorStreamer, ODSSize)
}

func TestValidateODSSize(t *testing.T) {
edses := []struct {
name string
eds *rsmt2d.ExtendedDataSquare
}{
{
name: "no padding",
eds: edstest.RandEDS(t, 8),
},
{
name: "with padding",
eds: edstest.RandEDSWithTailPadding(t, 8, 11),
},
{
name: "empty", eds: share.EmptyEDS(),
},
}

tests := []struct {
name string
createFile func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error
valid bool
}{
{
name: "valid",
createFile: CreateODS,
valid: true,
},
{
name: "shorter",
createFile: func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODS(path, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
return file.Truncate(info.Size() - 1)
},
valid: false,
},
{
name: "longer",
createFile: func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODS(path, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
// append 1 byte to the file
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
_, err = file.Write([]byte{0})
return err
},
valid: false,
},
}

for _, tt := range tests {
for _, eds := range edses {
t.Run(fmt.Sprintf("%s/%s", tt.name, eds.name), func(t *testing.T) {
path := t.TempDir() + tt.name + eds.name
roots, err := share.NewAxisRoots(eds.eds)
require.NoError(t, err)
err = tt.createFile(path, roots, eds.eds)
require.NoError(t, err)

err = ValidateODSSize(path, eds.eds)
require.Equal(t, tt.valid, err == nil)
})
}
}
}

// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:0-16 382011 3104 ns/op
// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:1-16 9320 122408 ns/op
// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:0-16 4408911 266.5 ns/op
Expand Down
21 changes: 21 additions & 0 deletions store/file/q4.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
return nil
}

func validateQ4Size(path string, eds *rsmt2d.ExtendedDataSquare) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer f.Close()

odsSize := int(eds.Width() / 2)
shareSize := len(eds.GetCell(0, 0))
expectedSize := shareSize * odsSize * odsSize

info, err := f.Stat()
if err != nil {
return fmt.Errorf("getting file info: %w", err)
}
if info.Size() != int64(expectedSize) {
return fmt.Errorf("file size mismatch: expected %d, got %d", expectedSize, info.Size())
}
return nil
}

// openQ4 opens an existing Q4 file under given FS path.
func openQ4(path string, hdr *headerV0) (*q4, error) {
f, err := os.Open(path)
Expand Down
63 changes: 63 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ func (s *Store) createODSQ4File(
)
}

// if file already exists, check if it's corrupted
if errors.Is(err, os.ErrExist) {
err = s.validateAndRecoverODSQ4(square, roots, height, pathODS, pathQ4)
if err != nil {
return false, err
}
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
// if both file and link exist, we consider it as success
Expand All @@ -198,6 +206,29 @@ func (s *Store) createODSQ4File(
return false, nil
}

func (s *Store) validateAndRecoverODSQ4(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
pathODS, pathQ4 string,
) error {
// Validate the size of the file to ensure it's not corrupted
err := file.ValidateODSQ4Size(pathODS, pathQ4, square)
if err == nil {
return nil
}
log.Warnf("ODSQ4 file with height %d is corrupted, recovering", height)
err = s.removeODSQ4(height, roots.Hash())
if err != nil {
return fmt.Errorf("removing corrupted ODSQ4 file: %w", err)
}
err = file.CreateODSQ4(pathODS, pathQ4, roots, square)
if err != nil {
return fmt.Errorf("recreating ODSQ4 file: %w", err)
}
return nil
}

func (s *Store) createODSFile(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
Expand All @@ -214,6 +245,15 @@ func (s *Store) createODSFile(
)
}

// if file already exists, check if it's corrupted
if errors.Is(err, os.ErrExist) {
// Validate the size of the file to ensure it's not corrupted
err = s.validateAndRecoverODS(square, roots, height, pathODS)
if err != nil {
return false, err
}
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
// if both file and link exist, we consider it as success
Expand All @@ -231,6 +271,29 @@ func (s *Store) createODSFile(
return false, nil
}

func (s *Store) validateAndRecoverODS(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
pathODS string,
) error {
// Validate the size of the file to ensure it's not corrupted
err := file.ValidateODSSize(pathODS, square)
if err == nil {
return nil
}
log.Warnf("ODS file with height %d is corrupted, recovering", height)
err = s.removeODS(height, roots.Hash())
if err != nil {
return fmt.Errorf("removing corrupted ODS file: %w", err)
}
err = file.CreateODS(pathODS, roots, square)
if err != nil {
return fmt.Errorf("recreating ODS file: %w", err)
}
return nil
}

func (s *Store) linkHeight(datahash share.DataHash, height uint64) error {
linktoOds := s.heightToPath(height, odsFileExt)
if datahash.IsEmptyEDS() {
Expand Down
Loading