Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make refresh tasks controllable through UI and API #148

Merged
merged 2 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ Proxy probing component.

* `enable_http_rescue` - experimental feature to enable rescuing HTTP proxies, that were presented as SOCKS5 or HTTPS. Detected based on protocol probe heuristics. Defaults to false.

## refresher

Source refresh component.

* `enabled` - run the refresher. Enabled by default.
* `max_scheduled` - number of sources to refresh at the same time. Defaults to 5.

## mitm

HTTP proxy frontend.
Expand Down Expand Up @@ -231,11 +238,19 @@ Get information about refresh status for all sources

Get 20 last used proxies

## POST `/api/refresher/{source_name}`

Start refreshing the source

## DELETE `/api/refresher/{source_name}`

Stop refreshing the source

## GET `/api/history`

Get 100 last forwarding attempts

## GET `/api/history/:id`
## GET `/api/history/{id}`

Get sanitized HTTP response from forwarding attempt

Expand Down
164 changes: 136 additions & 28 deletions refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package refresher

import (
"context"
"fmt"
"net/http"
"os"
"sort"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/nfx/slrp/app"
"github.com/nfx/slrp/pmux"
"github.com/nfx/slrp/pool"
"github.com/nfx/slrp/probe"
"github.com/nfx/slrp/sources"
"github.com/nfx/slrp/stats"

Expand Down Expand Up @@ -80,17 +80,32 @@ func (s *status) EstFinish(queued int) time.Time {

type plan map[int]*status

type task struct {
source *sources.Source
cancel func()
}

type req struct {
name string
cmd string
err chan error
}

type Refresher struct {
probe probeContract
pool poolContract
stats statsContract
client *http.Client
next atomic.Value
progress chan progress
finish chan finish
snapshot chan chan plan
sources func() []sources.Source
plan plan
probe probeContract
pool poolContract
stats statsContract
client *http.Client
next atomic.Value
progress chan progress
finish chan finish
snapshot chan chan plan
sources func() []sources.Source
reqs chan req
active map[int]*task
plan plan
enabled bool
maxScheduled int
}

type probeContract interface {
Expand All @@ -108,15 +123,19 @@ type statsContract interface {
Snapshot() stats.Sources
}

func NewRefresher(stats *stats.Stats, pool *pool.Pool, probe *probe.Probe) *Refresher {
func NewRefresher(stats *stats.Stats, pool *pool.Pool, probe probeContract) *Refresher {
return &Refresher{
probe: probe,
pool: pool,
stats: stats,
finish: make(chan finish, 1),
progress: make(chan progress),
snapshot: make(chan chan plan),
plan: plan{},
probe: probe,
pool: pool,
stats: stats,
finish: make(chan finish, 1),
progress: make(chan progress),
snapshot: make(chan chan plan),
plan: plan{},
reqs: make(chan req),
active: map[int]*task{},
enabled: true,
maxScheduled: 5,
sources: func() []sources.Source {
return sources.Sources
},
Expand All @@ -126,6 +145,12 @@ func NewRefresher(stats *stats.Stats, pool *pool.Pool, probe *probe.Probe) *Refr
}
}

func (ref *Refresher) Configure(c app.Config) error {
ref.enabled = c.BoolOr("enabled", true)
ref.maxScheduled = c.IntOr("max_scheduled", 5)
return nil
}

func (ref *Refresher) Start(ctx app.Context) {
go ref.main(ctx)
}
Expand Down Expand Up @@ -174,8 +199,17 @@ func (ref *Refresher) main(ctx app.Context) {
}
log := app.Log.From(f.ctx)
log.Info().Err(f.Err).Msg("finished refresh")
_, ok = ref.active[f.Source]
if ok {
delete(ref.active, f.Source)
}
ctx.Heartbeat()
case r := <-ref.reqs:
r.err <- ref.handleReq(r)
case <-start:
if !ref.enabled {
continue
}
next = ref.checkSources(ctx.Ctx(), next)
ref.next.Store(next)
log.Trace().
Expand All @@ -186,17 +220,92 @@ func (ref *Refresher) main(ctx app.Context) {
}
}

func (ref *Refresher) handleReq(r req) error {
s := sources.ByName(r.name)
if s.Name() == "unknown" {
return fmt.Errorf("invalid source '%s'", r.name)
}
ctx := context.Background()
ctx = app.Log.WithStr(ctx, "source", s.Name())
switch r.cmd {
case "start":
return ref.start(ctx, s)
case "stop":
return ref.stop(ctx, s)
default:
return fmt.Errorf("invalid command: %s", r.cmd)
}
}

func (ref *Refresher) Snapshot() plan {
out := make(chan plan)
defer close(out)
ref.snapshot <- out
return <-out
}

func (ref *Refresher) HttpGet(_ *http.Request) (interface{}, error) {
func (ref *Refresher) HttpGet(_ *http.Request) (any, error) {
return ref.upcoming(), nil
}

// start the source
func (ref *Refresher) HttpPostByID(name string, r *http.Request) (any, error) {
res := make(chan error)
ref.reqs <- req{
name: name,
cmd: "start",
err: res,
}
return nil, <-res
}

func (ref *Refresher) start(ctx context.Context, source sources.Source) error {
log := app.Log.From(ctx)
log.Info().Msg("starting")
_, ok := ref.active[source.ID]
if ok {
return fmt.Errorf("source %s is running", source.Name())
}
client := ref.client
if source.Seed {
// TODO: wrap with history
client = http.DefaultClient
}
tctx, cancel := context.WithCancel(ctx)
ref.active[source.ID] = &task{
source: &source,
cancel: cancel,
}
go ref.refresh(tctx, client, source)
return nil
}

// stop the source
func (ref *Refresher) HttpDeletetByID(name string, r *http.Request) (any, error) {
res := make(chan error)
ref.reqs <- req{
name: name,
cmd: "stop",
err: res,
}
return nil, <-res
}

func (ref *Refresher) stop(ctx context.Context, source sources.Source) error {
log := app.Log.From(ctx)
log.Info().Msg("stopping")
t, ok := ref.active[source.ID]
if !ok {
return fmt.Errorf("source %s was not running", source.Name())
}
if t.cancel == nil {
return fmt.Errorf("cannot cancel %s", source.Name())
}
t.cancel()
delete(ref.active, source.ID)
return nil
}

type upcoming struct {
Source int
Delay time.Duration
Expand Down Expand Up @@ -250,14 +359,18 @@ var refreshDelay = 1 * time.Minute

func (ref *Refresher) checkSources(ctx context.Context, trigger time.Time) time.Time {
minSourceFrequency := 60 * time.Minute
for _, v := range ref.sources() {
srcs := ref.sources()
for _, v := range srcs {
if v.Frequency < minSourceFrequency {
minSourceFrequency = v.Frequency
}
}
nextTrigger := time.Now().Add(minSourceFrequency)
snapshot := ref.stats.Snapshot()
for _, s := range ref.sources() {
for _, s := range srcs {
if len(ref.active) > ref.maxScheduled {
return trigger.Add(1 * time.Minute)
}
sctx := app.Log.WithStr(ctx, "source", s.Name())
log := app.Log.From(sctx)
if s.Feed == nil {
Expand Down Expand Up @@ -290,12 +403,7 @@ func (ref *Refresher) checkSources(ctx context.Context, trigger time.Time) time.
continue
}
log.Trace().Msg("scheduling refresh")
client := ref.client
if s.Seed {
// TODO: wrap with history
client = http.DefaultClient
}
go ref.refresh(sctx, client, s)
ref.start(sctx, s)
}
// TODO: maybe bring back nextTrigger someday
return trigger.Add(1 * time.Minute)
Expand Down
4 changes: 4 additions & 0 deletions ui/src/App.css
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ tr.probe-running {
background-repeat: no-repeat;
}

.sr-only {
display: none;
}

.col-provider a {
width: 150px;
display: block;
Expand Down
45 changes: 39 additions & 6 deletions ui/src/Dashboard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,43 @@ function Cols(props: Summary) {
);
}

function StatusIcon(props: {
source: string;
state: string;
failure: string;
}) {
const [hover, setHover] = useState<boolean>(false);
const startTitle = props.failure ? `Restart from ${props.failure}` : 'Start';
const startRefresh = <i className="bi bi-collection-play-fill text-success"
title={startTitle} onClick={() => {
http
.post(`/refresher/${props.source}`)
.then(_ => props.state = "running");
}} />;

const stopRefresh = <i className="bi bi-stop-circle-fill text-danger" title="Stop" onClick={() => {
http
.delete(`/refresher/${props.source}`)
.then(_ => props.state = "idle");
}} />;

const idle = hover ? startRefresh : <i className="bi bi-alarm text-muted" title="Idle" />

let icons: Record<string, ReactNode> = {
running: hover ? stopRefresh : <i className="spinner-border spinner-border-sm text-success">
<span className="sr-only">.</span>
</i>,
failed: hover ? startRefresh : <i className="bi bi-emoji-dizzy-fill" title={props.failure} />,
idle: idle,
"": idle,
};
return <span
onMouseOver={() => setHover(true)}
onMouseLeave={() => setHover(false)}>
{icons[props.state]}&nbsp;
</span>;
}

function Probe(props: Source) {
const { Name, State, Progress, Failure, EstFinish, NextRefresh, UrlPrefix, Homepage } = props;
const style: Record<string, string | number> = {};
Expand All @@ -134,15 +171,11 @@ function Probe(props: Source) {
style.backgroundImage = lg;
}
let refresh = running ? <TimeDiff ts={EstFinish} title="Estimated finish" /> : <TimeDiff ts={NextRefresh} title="Next Refresh" />;
let icons: Record<string, ReactNode> = {
running: <i className="spinner-border spinner-border-sm text-success" />,
failed: <i className="bi bi-emoji-dizzy-fill" title={Failure} />,
idle: <i className="bi bi-alarm text-muted" title="Idle" />
};

return (
<tr className={rowClass} style={style}>
<td>
{icons[State]}&nbsp;
<StatusIcon source={Name} state={State} failure={Failure} />
<a href={`/history?filter=URL ~ "${UrlPrefix}" AND StatusCode < 500`} className="app-link" target="_blank" rel="noreferrer">
{Name}
</a>
Expand Down
10 changes: 7 additions & 3 deletions ui/src/History.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function Request(history: FilteredRequest) {
<td>
<span className="request">
{history.Method}{" "}
<a className="app-link" href={`http://localhost:8089/api/history/${history.ID}?format=text`} rel="noreferrer" target="_blank">
<a className="app-link" href={`/api/history/${history.ID}?format=text`} rel="noreferrer" target="_blank">
<abbr title={history.URL}>{path}</abbr>
</a>
<sup>
Expand All @@ -62,10 +62,14 @@ function Request(history: FilteredRequest) {
{history.StatusCode === 200 ? 200 : <abbr title={history.Status}>{history.StatusCode}</abbr>} <sup>{history.Attempt}</sup>
</td>
<td className="text-muted proxy">
<a className="link-primary app-link" href={`/history?filter=Proxy:"${Proxy}"`}>
<a className="link-primary app-link" href={`/history?filter=Proxy:"${history.Proxy}"`}>
{history.Proxy}
</a>{" "}
<sup>{history.Appeared}</sup>
<sup>
<a className="link-primary app-link" href={`/proxies?filter=Proxy:"${history.Proxy}"`}>
{history.Appeared}
</a>
</sup>
</td>
<td className="size">{convertSize(history.Size)}</td>
<td className="took">{history.Took}s</td>
Expand Down
1 change: 1 addition & 0 deletions ui/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"target": "ESNext",
"lib": ["dom", "dom.iterable", "esnext"],
"types": ["vite/client"],
"typeRoots": ["./node_modules/@types/", "./types", "./node_modules"],
"allowJs": false,
"skipLibCheck": false,
"esModuleInterop": false,
Expand Down