Skip to content

Commit

Permalink
Close files in CSV writer
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi committed Jun 23, 2022
1 parent 2a9e07d commit 043502a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {

indexerConfig = file.Config{
Mode: fileMode,
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvOutput.Name),
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ var (
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvOutput,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
2 changes: 1 addition & 1 deletion cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvOutput,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
4 changes: 2 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,8 @@ var (
Usage: "Statediff file writing mode (current options: csv, sql)",
Value: "csv",
}
StateDiffFileCsvOutput = cli.StringFlag{
Name: "statediff.file.csvoutput",
StateDiffFileCsvDir = cli.StringFlag{
Name: "statediff.file.csvdir",
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
}
StateDiffFilePath = cli.StringFlag{
Expand Down
2 changes: 1 addition & 1 deletion statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ResolveFileMode(str string) (FileMode, error) {
}
}

// Config holds params for writing CSV files out to a directory
// Config holds params for writing out CSV or SQL files
type Config struct {
Mode FileMode
OutputDir string
Expand Down
38 changes: 27 additions & 11 deletions statediff/indexer/database/file/csv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type CSVWriter struct {

type fileWriter struct {
*csv.Writer
file *os.File
}

// fileWriters wraps the file writers for each output table
Expand All @@ -77,13 +78,13 @@ func newFileWriter(path string) (ret fileWriter, err error) {
if err != nil {
return
}
ret = fileWriter{csv.NewWriter(file)}
return
}

func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
ret = fileWriter{
Writer: csv.NewWriter(file),
file: file,
}

return
}

func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
Expand All @@ -101,6 +102,21 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
return writers, nil
}

func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
}

func (tx fileWriters) close() error {
for _, w := range tx {
err := w.file.Close()
if err != nil {
return err
}
}
return nil
}

func (tx fileWriters) flush() error {
for _, w := range tx {
w.Flush()
Expand Down Expand Up @@ -137,10 +153,10 @@ func (csw *CSVWriter) Loop() {
for {
select {
case row := <-csw.rows:
// TODO: Check available buffer size and flush
csw.writers.flush()

csw.writers.write(&row.table, row.values...)
err := csw.writers.write(&row.table, row.values...)
if err != nil {
panic(fmt.Sprintf("error writing csv buffer: %v", err))
}
case <-csw.quitChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
Expand Down Expand Up @@ -168,7 +184,7 @@ func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv")
func (csw *CSVWriter) Close() error {
close(csw.quitChan)
<-csw.doneChan
return nil
return csw.writers.close()
}

func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
Expand Down
13 changes: 9 additions & 4 deletions statediff/indexer/database/file/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import (

// Writer interface required by the file indexer
type FileWriter interface {
// Methods used to control the writer
Loop()
Close() error
Flush()

// Methods to write out data to tables
upsertNode(node nodeinfo.Info)
upsertIPLD(ipld models.IPLDModel)
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
upsertHeaderCID(header models.HeaderModel)
upsertUncleCID(uncle models.UncleModel)
upsertTransactionCID(transaction models.TxModel)
Expand All @@ -42,4 +41,10 @@ type FileWriter interface {
upsertStateCID(stateNode models.StateNodeModel)
upsertStateAccount(stateAccount models.StateAccountModel)
upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel)

// Methods to upsert IPLD in different ways
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
}
2 changes: 1 addition & 1 deletion statediff/types/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ var TableLog = Table{
}

var TableStateAccount = Table{
"eth.state_account",
"eth.state_accounts",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},
Expand Down

0 comments on commit 043502a

Please sign in to comment.