Skip to content

Commit

Permalink
Merge pull request #20 from lbryio/feature/6/jeffreypicard/hub-federa…
Browse files Browse the repository at this point in the history
…tion

Most of federation is written, need to finish udp and test
  • Loading branch information
jeffreypicard authored Nov 5, 2021
2 parents 02dbea4 + 15614c6 commit 284f825
Show file tree
Hide file tree
Showing 16 changed files with 2,047 additions and 497 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/build-short.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 'Build Hub'
name: 'Build and Test Hub'

on:
push:
Expand All @@ -10,4 +10,5 @@ jobs:
- uses: actions/[email protected]
with:
go-version: 1.16.5
- run: go build .
- run: go build .
- run: cd server && go test -v -race
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 'Build Hub'
name: 'Build and Test Hub'

on:
push:
Expand Down Expand Up @@ -30,3 +30,4 @@ jobs:
- run: go get github.com/golang/protobuf/protoc-gen-go google.golang.org/grpc/cmd/protoc-gen-go-grpc
- run: go build .
- run: ./protobuf/build.sh
- run: cd server && go test -v -race
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ require (
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57
github.com/olivere/elastic/v7 v7.0.24
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect
golang.org/x/text v0.3.6
google.golang.org/genproto v0.0.0-20210524171403-669157292da3 // indirect
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.27.1
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 8 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,13 @@ var (
Help: "Histogram of query times",
Buckets: HistogramBuckets,
}, []string{"method"})
PeersKnown = promauto.NewGauge(prometheus.GaugeOpts{
Name: "peers_known",
Help: "Number of peers we know about.",
})
PeersSubscribed = promauto.NewGauge(prometheus.GaugeOpts{
Name: "peers_subbed",
Help: "Number of peers that are subscribed to us.",
})
)

194 changes: 25 additions & 169 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,189 +4,45 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"strings"
"time"

"github.com/akamensky/argparse"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/hub/server"
"github.com/lbryio/lbry.go/v2/extras/util"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const (
defaultHost = "0.0.0.0"
defaultPort = "50051"
defaultEsHost = "http://localhost"
defaultEsIndex = "claims"
defaultEsPort = "9200"
defaultRefreshDelta = 5
defaultCacheTTL = 5
)

func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
items := make(map[string]string)
for _, item := range data {
key, val := getkeyval(item)
items[key] = val
}
return items
}

func GetEnvironmentStandard() map[string]string {
return GetEnvironment(os.Environ(), func(item string) (key, val string) {
splits := strings.Split(item, "=")
key = splits[0]
val = splits[1]
return
})
}

/*
func makeServeCmd(parser *argparse.Parser) *argparse.Command {
serveCmd := parser.NewCommand("serve", "start the hub server")
host := serveCmd.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost})
port := serveCmd.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort})
esHost := serveCmd.String("", "eshost", &argparse.Options{Required: false, Help: "host", Default: defaultEsHost})
esPort := serveCmd.String("", "esport", &argparse.Options{Required: false, Help: "port", Default: defaultEsPort})
dev := serveCmd.Flag("", "dev", &argparse.Options{Required: false, Help: "port", Default: false})
return serveCmd
}
*/

func parseArgs(searchRequest *pb.SearchRequest) *server.Args {

environment := GetEnvironmentStandard()
parser := argparse.NewParser("hub", "hub server and client")

serveCmd := parser.NewCommand("serve", "start the hub server")
searchCmd := parser.NewCommand("search", "claim search")
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})

host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: defaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: defaultPort})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: defaultRefreshDelta})
cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: defaultCacheTTL})

text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
claimType := parser.String("", "claim_type", &argparse.Options{Required: false, Help: "claim_type"})
id := parser.String("", "id", &argparse.Options{Required: false, Help: "id"})
author := parser.String("", "author", &argparse.Options{Required: false, Help: "author"})
title := parser.String("", "title", &argparse.Options{Required: false, Help: "title"})
description := parser.String("", "description", &argparse.Options{Required: false, Help: "description"})
channelId := parser.String("", "channel_id", &argparse.Options{Required: false, Help: "channel id"})
channelIds := parser.StringList("", "channel_ids", &argparse.Options{Required: false, Help: "channel ids"})

// Now parse the arguments
err := parser.Parse(os.Args)
if err != nil {
log.Fatalln(parser.Usage(err))
}

args := &server.Args{
CmdType: server.SearchCmd,
Host: *host,
Port: ":" + *port,
EsHost: *esHost,
EsPort: *esPort,
EsIndex: *esIndex,
Debug: *debug,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
}

if esHost, ok := environment["ELASTIC_HOST"]; ok {
args.EsHost = esHost
}

if !strings.HasPrefix(args.EsHost, "http") {
args.EsHost = "http://" + args.EsHost
}

if esPort, ok := environment["ELASTIC_PORT"]; ok {
args.EsPort = esPort
}

/*
Verify no invalid argument combinations
*/
if len(*channelIds) > 0 && *channelId != "" {
log.Fatal("Cannot specify both channel_id and channel_ids")
}

if serveCmd.Happened() {
args.CmdType = server.ServeCmd
} else if searchCmd.Happened() {
args.CmdType = server.SearchCmd
}

if *text != "" {
searchRequest.Text = *text
}
if *name != "" {
searchRequest.ClaimName = *name
}
if *claimType != "" {
searchRequest.ClaimType = []string{*claimType}
}
if *id != "" {
searchRequest.ClaimId = &pb.InvertibleField{Invert: false, Value: []string{*id}}
}
if *author != "" {
searchRequest.Author = *author
}
if *title != "" {
searchRequest.Title = *title
}
if *description != "" {
searchRequest.Description = *description
}
if *channelId != "" {
searchRequest.ChannelId = &pb.InvertibleField{Invert: false, Value: []string{*channelId}}
}
if len(*channelIds) > 0 {
searchRequest.ChannelId = &pb.InvertibleField{Invert: false, Value: *channelIds}
}

return args
}

func main() {

ctx := context.Background()
searchRequest := &pb.SearchRequest{}

args := parseArgs(searchRequest)
args := server.ParseArgs(searchRequest)

if args.CmdType == server.ServeCmd {

l, err := net.Listen("tcp", args.Port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s := server.MakeHubServer(args)
pb.RegisterHubServer(s.GrpcServer, s)
reflection.Register(s.GrpcServer)

log.Printf("listening on %s\n", l.Addr().String())
log.Println(s.Args)
go s.PromethusEndpoint("2112", "metrics")
if err := s.GrpcServer.Serve(l); err != nil {
log.Fatalf("failed to serve: %v", err)
}
// This will cancel goroutines with the server finishes.
ctxWCancel, cancel := context.WithCancel(ctx)
defer cancel()

s := server.MakeHubServer(ctxWCancel, args)
s.Run()
//l, err := net.Listen("tcp", ":"+args.Port)
//if err != nil {
// log.Fatalf("failed to listen: %v", err)
//}
//
//pb.RegisterHubServer(s.GrpcServer, s)
//reflection.Register(s.GrpcServer)
//
//log.Printf("listening on %s\n", l.Addr().String())
//log.Println(s.Args)
//if err := s.GrpcServer.Serve(l); err != nil {
// log.Fatalf("failed to serve: %v", err)
//}
return
}

conn, err := grpc.Dial("localhost"+args.Port,
conn, err := grpc.Dial("localhost:"+args.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
Expand All @@ -197,13 +53,13 @@ func main() {

c := pb.NewHubClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctxWTimeout, cancelQuery := context.WithTimeout(ctx, time.Second)
defer cancelQuery()

log.Println(args)
switch args.CmdType {
case server.SearchCmd:
r, err := c.Search(ctx, searchRequest)
r, err := c.Search(ctxWTimeout, searchRequest)
if err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 14 additions & 0 deletions protobuf/definitions/hub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,27 @@ package pb;
service Hub {
rpc Search (SearchRequest) returns (Outputs) {}
rpc Ping (EmptyMessage) returns (StringValue) {}
rpc Hello (HelloMessage) returns (HelloMessage) {}
rpc AddPeer (ServerMessage) returns (StringValue) {}
rpc PeerSubscribe (ServerMessage) returns (StringValue) {}
rpc Version (EmptyMessage) returns (StringValue) {}
rpc Features (EmptyMessage) returns (StringValue) {}
rpc Broadcast(EmptyMessage) returns (UInt32Value) {}
}

message EmptyMessage {}

message ServerMessage {
string address = 1;
string port = 2;
}

message HelloMessage {
string port = 1;
string host = 2;
repeated ServerMessage servers = 3;
}

message InvertibleField {
bool invert = 1;
repeated string value = 2;
Expand Down
Loading

0 comments on commit 284f825

Please sign in to comment.