Skip to content

Commit

Permalink
Additional logging ctx (#401)
Browse files Browse the repository at this point in the history
* More logging with context
  • Loading branch information
irees authored Jan 14, 2025
1 parent 06d61be commit f367913
Show file tree
Hide file tree
Showing 84 changed files with 741 additions and 984 deletions.
3 changes: 2 additions & 1 deletion cmd/transitland/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
_ "embed"
"os"

Expand Down Expand Up @@ -31,7 +32,7 @@ func (cmd *versionCommand) Parse(args []string) error {
return nil
}

func (cmd *versionCommand) Run() error {
func (cmd *versionCommand) Run(ctx context.Context) error {
log.Print("transitland-lib version: %s", tl.Version.Tag)
log.Print("transitland-lib commit: https://github.com/interline-io/transitland-lib/commit/%s (time: %s)", tl.Version.Commit, tl.Version.CommitTime)
log.Print("GTFS specification version: https://github.com/google/transit/blob/%s/gtfs/spec/en/reference.md", tl.GTFSVERSION)
Expand Down
3 changes: 2 additions & 1 deletion cmds/copy_cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmds

import (
"context"
"errors"

"github.com/interline-io/transitland-lib/adapters"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (cmd *CopyCommand) Parse(args []string) error {
return nil
}

func (cmd *CopyCommand) Run() error {
func (cmd *CopyCommand) Run(ctx context.Context) error {
// Reader / Writer
reader, err := ext.OpenReader(cmd.readerPath)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions cmds/delete_cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmds

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (cmd *DeleteCommand) Parse(args []string) error {
}

// Run this command
func (cmd *DeleteCommand) Run() error {
func (cmd *DeleteCommand) Run(ctx context.Context) error {
if cmd.Adapter == nil {
writer, err := tldb.OpenWriter(cmd.DBURL, true)
if err != nil {
Expand All @@ -74,18 +75,18 @@ func (cmd *DeleteCommand) Run() error {
if err != nil {
return err
}
err = cmd.Adapter.Get(&qrs, qstr, qargs...)
err = cmd.Adapter.Get(ctx, &qrs, qstr, qargs...)
if err == sql.ErrNoRows {
return fmt.Errorf("feed version %d does not exist", cmd.FVID)
} else if err != nil {
return err
}
if cmd.DryRun {
log.Info().Msgf("Deleting feed version: %d (dry run)", cmd.FVID)
log.For(ctx).Info().Msgf("Deleting feed version: %d (dry run)", cmd.FVID)
} else {
log.Info().Msgf("Deleting feed version: %d", cmd.FVID)
log.For(ctx).Info().Msgf("Deleting feed version: %d", cmd.FVID)
err := cmd.Adapter.Tx(func(atx tldb.Adapter) error {
return importer.DeleteFeedVersion(cmd.Adapter, cmd.FVID, cmd.ExtraTables)
return importer.DeleteFeedVersion(ctx, cmd.Adapter, cmd.FVID, cmd.ExtraTables)
})
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions cmds/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmds
// End to end tests for sync, fetch, and import

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestE2E(t *testing.T) {
expectStopTimes: 28,
},
}
ctx := context.TODO()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -141,7 +143,7 @@ func TestE2E(t *testing.T) {
FetchedAt: time.Now(),
},
}
if err := fetch.Run(); err != nil {
if err := fetch.Run(ctx); err != nil {
t.Fatal(err)
}

Expand All @@ -155,7 +157,7 @@ func TestE2E(t *testing.T) {
Activate: tc.activate,
},
}
if err := impcmd.Run(); err != nil {
if err := impcmd.Run(ctx); err != nil {
t.Fatal(err)
}

Expand All @@ -169,7 +171,7 @@ func TestE2E(t *testing.T) {
Workers: 1,
Adapter: atx,
}
if err := unimpcmd.Run(); err != nil {
if err := unimpcmd.Run(ctx); err != nil {
t.Fatal(err)
}
}
Expand Down
7 changes: 4 additions & 3 deletions cmds/extract_cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmds

import (
"context"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -139,7 +140,7 @@ func (cmd *ExtractCommand) Parse(args []string) error {
return nil
}

func (cmd *ExtractCommand) Run() error {
func (cmd *ExtractCommand) Run(ctx context.Context) error {
// Reader / Writer
reader, err := ext.OpenReader(cmd.readerPath)
if err != nil {
Expand Down Expand Up @@ -256,12 +257,12 @@ func (cmd *ExtractCommand) Run() error {

// Marker
if em.Count() > 0 {
log.Debugf("Extract filter: loading graph")
log.For(ctx).Debug().Msgf("Extract filter: loading graph")
if err := em.Filter(reader); err != nil {
return err
}
cp.Marker = &em
log.Debugf("Graph loading complete")
log.For(ctx).Debug().Msgf("Graph loading complete")
}

// Copy
Expand Down
37 changes: 19 additions & 18 deletions cmds/fetch_cmd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmds

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (cmd *FetchCommand) Parse(args []string) error {
}

// Run executes this command.
func (cmd *FetchCommand) Run() error {
func (cmd *FetchCommand) Run(ctx context.Context) error {
// Init
if cmd.Workers < 1 {
cmd.Workers = 1
Expand Down Expand Up @@ -126,7 +127,7 @@ func (cmd *FetchCommand) Run() error {
return err
}
feeds := []dmfr.Feed{}
err = cmd.Adapter.Select(&feeds, qstr, qargs...)
err = cmd.Adapter.Select(ctx, &feeds, qstr, qargs...)
if err != nil {
return err
}
Expand All @@ -143,17 +144,17 @@ func (cmd *FetchCommand) Run() error {
for _, osid := range cmd.FeedIDs {
// Get feed, create if not present and FeedCreate is specified
feed := dmfr.Feed{}
if err := adapter.Get(&feed, `SELECT * FROM current_feeds WHERE onestop_id = ?`, osid); err == sql.ErrNoRows && cmd.CreateFeed {
if err := adapter.Get(ctx, &feed, `SELECT * FROM current_feeds WHERE onestop_id = ?`, osid); err == sql.ErrNoRows && cmd.CreateFeed {
feed.FeedID = osid
feed.Spec = "gtfs"
if feed.ID, err = adapter.Insert(&feed); err != nil {
if feed.ID, err = adapter.Insert(ctx, &feed); err != nil {
return err
}
} else if err != nil {
return fmt.Errorf("problem with feed '%s': %s", osid, err.Error())
}
// Create feed state if not exists
if _, err := stats.GetFeedState(adapter, feed.ID); err != nil {
if _, err := stats.GetFeedState(ctx, adapter, feed.ID); err != nil {
return err
}
// Prepare options for this fetch
Expand Down Expand Up @@ -181,7 +182,7 @@ func (cmd *FetchCommand) Run() error {

///////////////
// Here we go
log.Infof("Fetching %d feeds", len(cmd.FeedIDs))
log.For(ctx).Info().Msgf("Fetching %d feeds", len(cmd.FeedIDs))
jobs := make(chan fetchJob, len(cmd.FeedIDs))
results := make(chan FetchCommandResult, len(cmd.FeedIDs))
for _, opts := range toFetch {
Expand All @@ -193,7 +194,7 @@ func (cmd *FetchCommand) Run() error {
var wg sync.WaitGroup
for w := 0; w < cmd.Workers; w++ {
wg.Add(1)
go fetchWorker(cmd.Adapter, cmd.DryRun, jobs, results, &wg)
go fetchWorker(ctx, cmd.Adapter, cmd.DryRun, jobs, results, &wg)
}
wg.Wait()
close(results)
Expand All @@ -218,9 +219,9 @@ func (cmd *FetchCommand) Run() error {
fetchNew++
}
}
log.Infof("Existing: %d New: %d Errors: %d", fetchFound, fetchNew, fetchErrs)
log.For(ctx).Info().Msgf("Existing: %d New: %d Errors: %d", fetchFound, fetchNew, fetchErrs)
if fatalError != nil {
log.Infof("Exiting with error because at least one fetch had fatal error: %s", fatalError.Error())
log.For(ctx).Info().Msgf("Exiting with error because at least one fetch had fatal error: %s", fatalError.Error())
return fatalError
}
return nil
Expand All @@ -231,12 +232,12 @@ type fetchJob struct {
fetch.Options
}

func fetchWorker(adapter tldb.Adapter, DryRun bool, jobs <-chan fetchJob, results chan<- FetchCommandResult, wg *sync.WaitGroup) {
func fetchWorker(ctx context.Context, adapter tldb.Adapter, DryRun bool, jobs <-chan fetchJob, results chan<- FetchCommandResult, wg *sync.WaitGroup) {
for job := range jobs {
// Start
log.Infof("Feed %s: start", job.OnestopID)
log.For(ctx).Info().Msgf("Feed %s: start", job.OnestopID)
if DryRun {
log.Infof("Feed %s: dry-run", job.OnestopID)
log.For(ctx).Info().Msgf("Feed %s: dry-run", job.OnestopID)
continue
}

Expand All @@ -245,23 +246,23 @@ func fetchWorker(adapter tldb.Adapter, DryRun bool, jobs <-chan fetchJob, result
t := time.Now()
fetchError := adapter.Tx(func(atx tldb.Adapter) error {
var fetchError error
result, fetchError = fetch.StaticFetch(atx, job.Options)
result, fetchError = fetch.StaticFetch(ctx, atx, job.Options)
return fetchError
})
t2 := float64(time.Now().UnixNano()-t.UnixNano()) / 1e9 // 1000000000.0

// Log result
fv := result.FeedVersion
if fetchError != nil {
log.Error().Err(fetchError).Msgf("Feed %s (id:%d): url: %s critical error: %s (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, fetchError.Error(), t2)
log.For(ctx).Error().Err(fetchError).Msgf("Feed %s (id:%d): url: %s critical error: %s (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, fetchError.Error(), t2)
} else if result.FetchError != nil {
log.Error().Err(result.FetchError).Msgf("Feed %s (id:%d): url: %s fetch error: %s (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, result.FetchError.Error(), t2)
log.For(ctx).Error().Err(result.FetchError).Msgf("Feed %s (id:%d): url: %s fetch error: %s (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, result.FetchError.Error(), t2)
} else if fv != nil && result.Found {
log.Infof("Feed %s (id:%d): url: %s found sha1: %s (id:%d) (t:%0.2fs)", job.OnestopID, job.Options.FeedID, fv.URL, fv.SHA1, fv.ID, t2)
log.For(ctx).Info().Msgf("Feed %s (id:%d): url: %s found sha1: %s (id:%d) (t:%0.2fs)", job.OnestopID, job.Options.FeedID, fv.URL, fv.SHA1, fv.ID, t2)
} else if fv != nil {
log.Infof("Feed %s (id:%d): url: %s new: %s (id:%d) (t:%0.2fs)", job.OnestopID, job.Options.FeedID, fv.URL, fv.SHA1, fv.ID, t2)
log.For(ctx).Info().Msgf("Feed %s (id:%d): url: %s new: %s (id:%d) (t:%0.2fs)", job.OnestopID, job.Options.FeedID, fv.URL, fv.SHA1, fv.ID, t2)
} else {
log.Infof("Feed %s (id:%d): url: %s invalid result (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, t2)
log.For(ctx).Info().Msgf("Feed %s (id:%d): url: %s invalid result (t:%0.2fs)", job.OnestopID, job.Options.FeedID, result.URL, t2)
}
results <- FetchCommandResult{
Result: result.Result,
Expand Down
5 changes: 3 additions & 2 deletions cmds/fetch_cmd_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmds

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestFetchCommand(t *testing.T) {
fatalErrorContains: "file does not exist or invalid data",
},
}
_ = cases
ctx := context.TODO()
for _, exp := range cases {
t.Run("", func(t *testing.T) {
adapter := testdb.MustOpenWriter("sqlite3://:memory:", true).Adapter
Expand All @@ -80,7 +81,7 @@ func TestFetchCommand(t *testing.T) {
if err := c.Parse(exp.command); err != nil {
t.Fatal(err)
}
if err := c.Run(); err != nil && exp.fatalErrorContains != "" {
if err := c.Run(ctx); err != nil && exp.fatalErrorContains != "" {
if !strings.Contains(err.Error(), exp.fatalErrorContains) {
t.Errorf("got '%s' error, expected to contain '%s'", err.Error(), exp.fatalErrorContains)
}
Expand Down
7 changes: 4 additions & 3 deletions cmds/format_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmds

import (
"bytes"
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -38,15 +39,15 @@ func (cmd *FormatCommand) Parse(args []string) error {
}

// Run this command.
func (cmd *FormatCommand) Run() error {
func (cmd *FormatCommand) Run(ctx context.Context) error {
filename := cmd.Filename
if filename == "" {
return errors.New("must specify filename")
}
// First, validate DMFR
_, err := dmfr.LoadAndParseRegistry(filename)
if err != nil {
log.Errorf("%s: Error when loading DMFR: %s", filename, err.Error())
log.For(ctx).Error().Msgf("%s: Error when loading DMFR: %s", filename, err.Error())
}

// Re-read as raw registry
Expand All @@ -56,7 +57,7 @@ func (cmd *FormatCommand) Run() error {
}
rr, err := dmfr.ReadRawRegistry(r)
if err != nil {
log.Errorf("%s: Error when loading DMFR: %s", filename, err.Error())
log.For(ctx).Error().Msgf("%s: Error when loading DMFR: %s", filename, err.Error())
}
var buf bytes.Buffer
if err := rr.Write(&buf); err != nil {
Expand Down
Loading

0 comments on commit f367913

Please sign in to comment.