Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2214 from gochigo/graceful-shutdown
Browse files Browse the repository at this point in the history
fixed server and worker to be gracefully shut down
  • Loading branch information
sio4 authored Mar 11, 2022
2 parents f56674c + 791f6d5 commit 6ab7f0f
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 43 deletions.
23 changes: 20 additions & 3 deletions events.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package buffalo

// TODO: TODO-v1 check if they are really need to be exported.
/* The event id should be unique across packages as the format of
"<package-name>:<additional-names>:<optional-error>" as documented. They
should not be used by another packages to keep it informational. To make
it sure, they need to be internal.
Especially for plugable conponents like servers or workers, they can have
their own event definition if they need but the buffalo runtime can emit
generalize events when e.g. the runtime calls configured worker.
*/
const (
// EvtAppStart is emitted when buffalo.App#Serve is called
EvtAppStart = "buffalo:app:start"
Expand All @@ -18,12 +27,20 @@ const (
// EvtRouteErr is emitted when there is a problem handling processing a route
EvtRouteErr = "buffalo:route:err"

// EvtWorkerStart is emitted when buffalo.App#Serve is called and workers are started
// EvtServerStart is emitted when buffalo is about to start servers
EvtServerStart = "buffalo:server:start"
// EvtServerStartErr is emitted when an error occurs when starting servers
EvtServerStartErr = "buffalo:server:start:err"
// EvtServerStop is emitted when buffalo is about to stop servers
EvtServerStop = "buffalo:server:stop"
// EvtServerStopErr is emitted when an error occurs when stopping servers
EvtServerStopErr = "buffalo:server:stop:err"

// EvtWorkerStart is emitted when buffalo is about to start workers
EvtWorkerStart = "buffalo:worker:start"
// EvtWorkerStartErr is emitted when an error occurs when starting workers
EvtWorkerStartErr = "buffalo:worker:start:err"

// EvtWorkerStop is emitted when buffalo.App#Stop is called and workers are stopped
// EvtWorkerStop is emitted when buffalo is about to stop workers
EvtWorkerStop = "buffalo:worker:stop"
// EvtWorkerStopErr is emitted when an error occurs when stopping workers
EvtWorkerStopErr = "buffalo:worker:stop:err"
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (
github.com/markbates/oncer v1.0.0
github.com/markbates/refresh v1.12.0
github.com/markbates/safe v1.0.1
github.com/markbates/sigtx v1.0.0
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/monoculum/formam v3.5.5+incompatible
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ github.com/markbates/refresh v1.12.0 h1:vTgVPX4p77v26auBJhlgaTQ/adWAYSIcVYJvM63N
github.com/markbates/refresh v1.12.0/go.mod h1:Vpwi1+q+2U1VxE7C0Ilj6r2/+TigRzQcLez6XM3bPLc=
github.com/markbates/safe v1.0.1 h1:yjZkbvRM6IzKj9tlu/zMJLS0n/V351OZWRnF3QfaUxI=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/markbates/sigtx v1.0.0 h1:y/xtkBvNPRjD4KeEplf4w9rJVSc23/xl+jXYGowTwy0=
github.com/markbates/sigtx v1.0.0/go.mod h1:QF1Hv6Ic6Ca6W+T+DL0Y/ypborFKyvUY9HmuCD4VeTc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
Expand Down
12 changes: 10 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Options struct {
// to "_buffalo_session".
SessionName string `json:"session_name"`

// Timeout in second for ongoing requests when shutdown the server.
// The default value is 60.
TimeoutSecondShutdown int `json:"timeout_second_shutdown"`

// Worker implements the Worker interface and can process tasks in the background.
// Default is "github.com/gobuffalo/worker.Simple.
Worker worker.Worker `json:"-"`
Expand Down Expand Up @@ -99,6 +103,7 @@ func optionsWithDefaults(opts Options) Options {
// TCP case
opts.Addr = defaults.String(opts.Addr, fmt.Sprintf("%s:%s", envAddr, envy.Get("PORT", "3000")))
}
opts.Host = defaults.String(opts.Host, envy.Get("HOST", fmt.Sprintf("http://127.0.0.1:%s", envy.Get("PORT", "3000"))))

if opts.PreWares == nil {
opts.PreWares = []PreWare{}
Expand Down Expand Up @@ -176,12 +181,15 @@ func optionsWithDefaults(opts Options) Options {

opts.SessionStore = cookieStore
}
opts.SessionName = defaults.String(opts.SessionName, "_buffalo_session")

if opts.Worker == nil {
w := worker.NewSimpleWithContext(opts.Context)
w.Logger = opts.Logger
opts.Worker = w
}
opts.SessionName = defaults.String(opts.SessionName, "_buffalo_session")
opts.Host = defaults.String(opts.Host, envy.Get("HOST", fmt.Sprintf("http://127.0.0.1:%s", envy.Get("PORT", "3000"))))

opts.TimeoutSecondShutdown = defaults.Int(opts.TimeoutSecondShutdown, 60)

return opts
}
80 changes: 53 additions & 27 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,33 @@ import (
"errors"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/gobuffalo/buffalo/servers"
"github.com/gobuffalo/events"
"github.com/markbates/refresh/refresh/web"
"github.com/markbates/sigtx"
)

// Serve the application at the specified address/port and listen for OS
// interrupt and kill signals and will attempt to stop the application
// gracefully. This will also start the Worker process, unless WorkerOff is enabled.
func (a *App) Serve(srvs ...servers.Server) error {
a.Logger.Infof("Starting application at http://%s", a.Options.Addr)
var wg sync.WaitGroup

// FIXME: this information is not correct.
// It needs to be fixed as we support multiple servers.
a.Logger.Infof("starting application at http://%s", a.Options.Addr)

payload := events.Payload{
"app": a,
}
if err := events.EmitPayload(EvtAppStart, payload); err != nil {
// just to make sure if events work properly?
a.Logger.Error("unable to emit event. something went wrong internally")
return err
}

Expand All @@ -39,70 +47,88 @@ func (a *App) Serve(srvs ...servers.Server) error {
}
}

ctx, cancel := sigtx.WithCancel(a.Context, syscall.SIGTERM, os.Interrupt)
ctx, cancel := signal.NotifyContext(a.Context, syscall.SIGTERM, os.Interrupt)
defer cancel()

wg.Add(1)
go func() {
// gracefully shut down the application when the context is cancelled
defer wg.Done()
// channel waiter should not be called any other place
<-ctx.Done()
a.Logger.Info("Shutting down application")

events.EmitError(EvtAppStop, ctx.Err(), payload)
a.Logger.Info("shutting down application")

if err := a.Stop(ctx.Err()); err != nil {
events.EmitError(EvtAppStopErr, err, payload)
a.Logger.Error(err)
// shutting down listeners first, to make sure no more new request
a.Logger.Info("shutting down servers")
for _, s := range srvs {
timeout := time.Duration(a.Options.TimeoutSecondShutdown) * time.Second
ctx, cfn := context.WithTimeout(context.Background(), timeout)
defer cfn()
events.EmitPayload(EvtServerStop, payload)
if err := s.Shutdown(ctx); err != nil {
events.EmitError(EvtServerStopErr, err, payload)
a.Logger.Error("shutting down server: ", err)
}
cfn()
}

if !a.WorkerOff {
// stop the workers
a.Logger.Info("Shutting down worker")
a.Logger.Info("shutting down worker")
events.EmitPayload(EvtWorkerStop, payload)
if err := a.Worker.Stop(); err != nil {
events.EmitError(EvtWorkerStopErr, err, payload)
a.Logger.Error(err)
}
}

for _, s := range srvs {
if err := s.Shutdown(ctx); err != nil {
a.Logger.Error(err)
a.Logger.Error("error while shutting down worker: ", err)
}
}

}()

// if configured to do so, start the workers
if !a.WorkerOff {
wg.Add(1)
go func() {
defer wg.Done()
events.EmitPayload(EvtWorkerStart, payload)
if err := a.Worker.Start(ctx); err != nil {
events.EmitError(EvtWorkerStartErr, err, payload)
a.Stop(err)
}
}()
}

for _, s := range srvs {
s.SetAddr(a.Addr)
wg.Add(1)
go func(s servers.Server) {
if err := s.Start(ctx, a); err != nil {
a.Stop(err)
}
defer wg.Done()
events.EmitPayload(EvtServerStart, payload)
// s.Start always returns non-nil error
a.Stop(s.Start(ctx, a))
}(s)
}

<-ctx.Done()
wg.Wait()
a.Logger.Info("shutdown completed")

return a.Context.Err()
err := ctx.Err()
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

// Stop the application and attempt to gracefully shutdown
func (a *App) Stop(err error) error {
a.cancel()
if err != nil && !errors.Is(err, context.Canceled) {
a.Logger.Error(err)
return err
events.EmitError(EvtAppStop, err, events.Payload{"app": a})

ce := a.Context.Err()
if ce != nil {
a.Logger.Warn("application context has already been canceled: ", ce)
return errors.New("application has already been canceled")
}

a.Logger.Warn("stopping application: ", err)
a.cancel()
return nil
}

Expand Down
155 changes: 155 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package buffalo

import (
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/gobuffalo/buffalo/render"
"github.com/gobuffalo/buffalo/worker"
"github.com/stretchr/testify/require"
)

// All tests in this file requires certain amount of waiting and they are
// timing sensitive. Adjust this timing values if they are failing due to
// timing issue.
const (
WAIT_START = 2
WAIT_RUN = 2
CONSUMER_RUN = 8
)

// startApp starts given buffalo app and check its exit status.
// The go routine emulates a buffalo app process.
func startApp(app *App, wg *sync.WaitGroup, r *require.Assertions) {
wg.Add(1)
go func() {
defer wg.Done()
err := app.Serve()
r.NoError(err)
}()
// wait until the server started.
// could be improved with connection test but that's too much...
time.Sleep(WAIT_START * time.Second)
}

func Test_Server_Simple(t *testing.T) {
// This testcase explains the minimum/basic workflow of buffalo app.
// Setup and execute the app, wait until startup, then stop it.
// The other testcases use this structure with additional actions.
r := require.New(t)
var wg sync.WaitGroup

// Setup a new buffalo.App to be used as a testing buffalo app.
app := New(Options{})

startApp(app, &wg, r) // starts buffalo app routine.

app.cancel()
wg.Wait()
}

var handlerDone = false

// timeConsumer consumes about 10 minutes for processing its request
func timeConsumer(c Context) error {
for i := 0; i < CONSUMER_RUN; i++ {
fmt.Println("#")
time.Sleep(1 * time.Second)
}
handlerDone = true
return c.Render(http.StatusOK, render.String("Hey!"))
}

func Test_Server_GracefulShutdownOngoingRequest(t *testing.T) {
// This test case explain the minimum/basic workflow of buffalo app.
r := require.New(t)
var wg sync.WaitGroup

// Setup a new buffalo.App with a simple time consuming handler.
app := New(Options{})
app.GET("/", timeConsumer)

startApp(app, &wg, r) // starts buffalo app routine.

firstQuery := false
secondQuery := false
// This routine is the 1st client that GETs before Stop it
// The result should be successful even though the server shutting down.
wg.Add(1)
go func() {
defer wg.Done()
resp, err := http.Get("http://127.0.0.1:3000")
r.NoError(err)
defer resp.Body.Close()
r.Equal(http.StatusOK, resp.StatusCode)
fmt.Println("the first query should be OK:", resp.Status)
firstQuery = true
}()
// make sure the request sent
time.Sleep(WAIT_RUN * time.Second)

app.cancel()
time.Sleep(1 * time.Second) // make sure the server started shutdown.

// This routine is the 2nd client that GETs after Stop it
// The result should be connection refused even though app is still on.
wg.Add(1)
go func() {
defer wg.Done()
_, err := http.Get("http://127.0.0.1:3000")
r.Contains(err.Error(), "refused")
fmt.Println("the second query should be refused:", err)
secondQuery = true
}()

wg.Wait()
r.Equal(true, handlerDone)
r.Equal(true, firstQuery)
r.Equal(true, secondQuery)
}

var timerDone = false

func timerWorker(args worker.Args) error {
for i := 0; i < CONSUMER_RUN; i++ {
fmt.Println("%")
time.Sleep(1 * time.Second)
}
timerDone = true
return nil
}

func Test_Server_GracefulShutdownOngoingWorker(t *testing.T) {
// This test case explain the minimum/basic workflow of buffalo app.
r := require.New(t)
var wg sync.WaitGroup

// Setup a new buffalo.App with a simple time consuming handler.
app := New(Options{})
app.Worker.Register("timer", timerWorker)
app.Worker.PerformIn(worker.Job{
Handler: "timer",
}, 1*time.Second)

startApp(app, &wg, r) // starts buffalo app routine.

time.Sleep(1 * time.Second) // make sure just 1 second

app.cancel()
time.Sleep(1 * time.Second) // make sure the server started shutdown.

// This routine is the 2nd client that GETs after Stop it
// The result should be connection refused even though app is still on.
wg.Add(1)
go func() {
defer wg.Done()
_, err := http.Get("http://127.0.0.1:3000")
r.Contains(err.Error(), "refused")
}()

wg.Wait()
r.Equal(true, timerDone)
}
Loading

0 comments on commit 6ab7f0f

Please sign in to comment.