Skip to content

Commit

Permalink
more readers and writers
Browse files Browse the repository at this point in the history
  • Loading branch information
vodolaz095 committed Aug 24, 2024
1 parent 87b9010 commit f38d3cb
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build: deps
CGO_ENABLED=0 go build -ldflags "-X main.Version=$(ver)" -o build/stocks_broadcaster main.go

start:
go run main.go ./contrib/stocks_broadcaster.yaml
go run main.go ./contrib/local.yaml

binary: build
./build/stocks_broadcaster contrib/stocks_broadcaster.yaml
Expand Down
21 changes: 16 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ package config

import "gopkg.in/yaml.v3"

type Input struct {
Name string `yaml:"name" validate:"required"`
Token string `yaml:"token" validate:"required"`
Figis []string `yaml:"figis" validate:"required"`
}

type Output struct {
Name string `yaml:"name" validate:"required"`
RedisURL string `yaml:"redis_url" validate:"required"`
}

type Instrument struct {
FIGI string `yaml:"figi"`
Name string `yaml:"name"`
Channel string `yaml:"channel"`
FIGI string `yaml:"figi" validate:"required"`
Name string `yaml:"name" validate:"required"`
Channel string `yaml:"channel" validate:"required"`
}

type Config struct {
Token string `yaml:"token" validate:"required"`
Inputs []Input `yaml:"inputs" validate:"required"`
Instruments []Instrument `yaml:"instruments" validate:"required"`
RedisURL string `yaml:"redis_url" validate:"required"`
Outputs []Output `yaml:"outputs" validate:"required"`
Log Log `yaml:"log" validate:"required"`
}

Expand Down
2 changes: 1 addition & 1 deletion config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
var testConfig Config

func TestLoadFromFile(t *testing.T) {
cfg, err := LoadFromFile("./../contrib/stocks_broadcaster.yaml")
cfg, err := LoadFromFile("./../contrib/stocks_broadcaster_example.yaml")
if err != nil {
t.Error(err)
return
Expand Down
21 changes: 16 additions & 5 deletions contrib/stocks_broadcaster_example.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
token: "secret"
redis_url: "redis://127.0.0.1:6379"
log:
level: trace
to_journald: false
inputs:
- name: "default"
token: "secret"
figis:
- "BBG333333333"
- "BBG004730RP0"
- "BBG00475KKY8"

instruments:
- figi: "BBG333333333"
name: "tmos"
Expand All @@ -13,3 +16,11 @@ instruments:
- figi: "BBG00475KKY8"
name: "NVTK"
channel: "stocks/NVTK"

outputs:
- name: "container"
redis_url: "redis://127.0.0.1:6379"

log:
level: trace
to_journald: false
3 changes: 2 additions & 1 deletion internal/service/service_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func (b *Broadcaster) Start(ctx context.Context) (err error) {
return NoReadersError
}
if cap(b.Cord) < DefaultChannelBuffer {
return fmt.Errorf("channel buffer is %v, while at least 100 is recommended", cap(b.Cord))
return fmt.Errorf("channel buffer is %v, while at least %v is recommended",
cap(b.Cord), DefaultChannelBuffer)
}
go func() {
var upd model.Update
Expand Down
8 changes: 4 additions & 4 deletions internal/transport/reader/invest_api/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/vodolaz095/stocks_broadcaster/config"
"github.com/vodolaz095/stocks_broadcaster/model"
)

const DefaultReadInterval = 10 * time.Millisecond

type Reader struct {
Description string
Connection *investapi.Client
ReadInterval time.Duration
Token string
Instruments []config.Instrument
Instruments []string
}

func (r *Reader) Name() string {
return "InvestAPI reader"
return "InvestAPI reader " + r.Description
}

func (r *Reader) Ping(ctx context.Context) error {
Expand All @@ -47,7 +47,7 @@ func (r *Reader) Start(ctx context.Context, updateFeed chan model.Update) (err e
var upd model.Update
var instruments []*investapi.LastPriceInstrument
for i := range r.Instruments {
instruments = append(instruments, &investapi.LastPriceInstrument{Figi: r.Instruments[i].FIGI})
instruments = append(instruments, &investapi.LastPriceInstrument{Figi: r.Instruments[i]})
}
// подписываемся на цену крайней сделки по акциям
request := investapi.MarketDataServerSideStreamRequest{
Expand Down
5 changes: 3 additions & 2 deletions internal/transport/writer/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
)

type Writer struct {
Client *redis.Client
Description string
Client *redis.Client
}

func (w *Writer) Name() string {
return "redis publisher"
return w.Description
}

func (w *Writer) Ping(ctx context.Context) error {
Expand Down
44 changes: 27 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,43 @@ func main() {
)

// configure readers
investApiClient, err := investapi.New(cfg.Token)
if err != nil {
log.Fatal().Err(err).Msgf("error connecting invest api: %s", err)
}
iaReader := investapi_reader.Reader{
Connection: investApiClient,
ReadInterval: investapi_reader.DefaultReadInterval,
Instruments: cfg.Instruments,
var readers []reader.StocksReader
for i := range cfg.Inputs {
investApiClient, err1 := investapi.New(cfg.Inputs[i].Token)
if err != nil {
log.Fatal().
Err(err1).
Msgf("error connecting invest api: %s", err1)
}
readers = append(readers, &investapi_reader.Reader{
Description: cfg.Inputs[i].Name,
Connection: investApiClient,
ReadInterval: investapi_reader.DefaultReadInterval,
Instruments: cfg.Inputs[i].Figis,
})
}

// configure writers
redisOpts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
log.Fatal().Err(err).Msgf("error parsing redis connection string %s: %s", cfg.RedisURL, err)
}
client := redis.NewClient(redisOpts)
rw := redisWriter.Writer{
Client: client,
var writers []writer.StocksWriter
for i := range cfg.Outputs {
redisOpts, err2 := redis.ParseURL(cfg.Outputs[i].RedisURL)
if err2 != nil {
log.Fatal().Err(err2).Msgf("error parsing redis connection string %s from %v: %s",
cfg.Outputs[i].RedisURL, i, err2)
}
writers = append(writers, &redisWriter.Writer{
Description: cfg.Outputs[i].Name,
Client: redis.NewClient(redisOpts),
})
}

// configure service
srv := service.Broadcaster{
FigiName: make(map[string]string, 0),
FigiChannel: make(map[string]string, 0),
Cord: make(chan model.Update, service.DefaultChannelBuffer),
Readers: []reader.StocksReader{&iaReader}, // todo - MORE!
Writers: []writer.StocksWriter{&rw}, // todo - MORE!
Readers: readers, // todo - MORE!
Writers: writers, // todo - MORE!
}
// configure service routing
for i := range cfg.Instruments {
Expand Down

0 comments on commit f38d3cb

Please sign in to comment.