From f04c4da72c7c3924de91c53007a2f54bdf664662 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jul 2024 19:22:23 +0200 Subject: [PATCH] Wip p2p enhancements --- .github/release.yml | 3 + core/cli/worker/worker_p2p.go | 2 +- core/http/endpoints/localai/welcome.go | 2 + core/http/routes/ui.go | 26 ++++- core/http/views/chat.html | 2 +- core/http/views/p2p.html | 33 ++++++ core/http/views/partials/navbar.html | 6 ++ core/http/views/talk.html | 2 +- core/p2p/node.go | 38 +++++++ core/p2p/p2p.go | 137 ++++++++++++++++++------- core/p2p/p2p_disabled.go | 8 ++ docs/static/install.sh | 43 ++++++-- 12 files changed, 255 insertions(+), 47 deletions(-) create mode 100644 core/http/views/p2p.html create mode 100644 core/p2p/node.go diff --git a/.github/release.yml b/.github/release.yml index 8c2c11f9f7a0..eee7f6ec3d9a 100644 --- a/.github/release.yml +++ b/.github/release.yml @@ -13,6 +13,9 @@ changelog: labels: - bug - regression + - title: "🖧 P2P area" + labels: + - area/p2p - title: Exciting New Features 🎉 labels: - Semver-Minor diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index 4651c36e5553..787f8f324abb 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -20,7 +20,7 @@ import ( type P2P struct { WorkerFlags `embed:""` - Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"` + Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"` NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"` RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"` RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"` diff --git a/core/http/endpoints/localai/welcome.go b/core/http/endpoints/localai/welcome.go index fa00e90047ac..34a2d975c10d 100644 --- a/core/http/endpoints/localai/welcome.go +++ b/core/http/endpoints/localai/welcome.go @@ -4,6 +4,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/p2p" "github.com/mudler/LocalAI/internal" "github.com/mudler/LocalAI/pkg/model" ) @@ -33,6 +34,7 @@ func WelcomeEndpoint(appConfig *config.ApplicationConfig, "Models": models, "ModelsConfig": backendConfigs, "GalleryConfig": galleryConfigs, + "IsP2PEnabled": p2p.IsP2PEnabled(), "ApplicationConfig": appConfig, "ProcessingModels": processingModels, "TaskTypes": taskTypes, diff --git a/core/http/routes/ui.go b/core/http/routes/ui.go index 4618f032598e..043ac23d774a 100644 --- a/core/http/routes/ui.go +++ b/core/http/routes/ui.go @@ -10,6 +10,7 @@ import ( "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/http/elements" "github.com/mudler/LocalAI/core/http/endpoints/localai" + "github.com/mudler/LocalAI/core/p2p" "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/internal" "github.com/mudler/LocalAI/pkg/model" @@ -53,6 +54,20 @@ func RegisterUIRoutes(app *fiber.App, app.Get("/", auth, localai.WelcomeEndpoint(appConfig, cl, ml, modelStatus)) + if p2p.IsP2PEnabled() { + app.Get("/p2p", auth, func(c *fiber.Ctx) error { + summary := fiber.Map{ + "Title": "LocalAI - P2P dashboard", + "Version": internal.PrintableVersion(), + "Nodes": p2p.GetAvailableNodes(), + "IsP2PEnabled": p2p.IsP2PEnabled(), + } + + // Render index + return c.Render("views/p2p", summary) + }) + } + // Show the Models page (all models) app.Get("/browse", auth, func(c *fiber.Ctx) error { term := c.Query("term") @@ -87,7 +102,9 @@ func RegisterUIRoutes(app *fiber.App, "AllTags": tags, "ProcessingModels": processingModelsData, "AvailableModels": len(models), - "TaskTypes": taskTypes, + "IsP2PEnabled": p2p.IsP2PEnabled(), + + "TaskTypes": taskTypes, // "ApplicationConfig": appConfig, } @@ -243,6 +260,7 @@ func RegisterUIRoutes(app *fiber.App, "ModelsConfig": backendConfigs, "Model": c.Params("model"), "Version": internal.PrintableVersion(), + "IsP2PEnabled": p2p.IsP2PEnabled(), } // Render index @@ -261,6 +279,7 @@ func RegisterUIRoutes(app *fiber.App, "Title": "LocalAI - Talk", "ModelsConfig": backendConfigs, "Model": backendConfigs[0].ID, + "IsP2PEnabled": p2p.IsP2PEnabled(), "Version": internal.PrintableVersion(), } @@ -282,6 +301,7 @@ func RegisterUIRoutes(app *fiber.App, "ModelsConfig": backendConfigs, "Model": backendConfigs[0].ID, "Version": internal.PrintableVersion(), + "IsP2PEnabled": p2p.IsP2PEnabled(), } // Render index @@ -296,6 +316,7 @@ func RegisterUIRoutes(app *fiber.App, "ModelsConfig": backendConfigs, "Model": c.Params("model"), "Version": internal.PrintableVersion(), + "IsP2PEnabled": p2p.IsP2PEnabled(), } // Render index @@ -316,6 +337,7 @@ func RegisterUIRoutes(app *fiber.App, "ModelsConfig": backendConfigs, "Model": backendConfigs[0].Name, "Version": internal.PrintableVersion(), + "IsP2PEnabled": p2p.IsP2PEnabled(), } // Render index @@ -330,6 +352,7 @@ func RegisterUIRoutes(app *fiber.App, "ModelsConfig": backendConfigs, "Model": c.Params("model"), "Version": internal.PrintableVersion(), + "IsP2PEnabled": p2p.IsP2PEnabled(), } // Render index @@ -349,6 +372,7 @@ func RegisterUIRoutes(app *fiber.App, "Title": "LocalAI - Generate audio with " + backendConfigs[0].Name, "ModelsConfig": backendConfigs, "Model": backendConfigs[0].Name, + "IsP2PEnabled": p2p.IsP2PEnabled(), "Version": internal.PrintableVersion(), } diff --git a/core/http/views/chat.html b/core/http/views/chat.html index 1e490b659481..79c395708e45 100644 --- a/core/http/views/chat.html +++ b/core/http/views/chat.html @@ -37,7 +37,7 @@
- {{template "views/partials/navbar"}} + {{template "views/partials/navbar" .}}
diff --git a/core/http/views/p2p.html b/core/http/views/p2p.html new file mode 100644 index 000000000000..f4fba684c2d1 --- /dev/null +++ b/core/http/views/p2p.html @@ -0,0 +1,33 @@ + + +{{template "views/partials/head" .}} + + +
+ + {{template "views/partials/navbar" .}} +
+ +
+

+ P2P

+ + + +
+ {{ range .Nodes }} + + {{ end }} +
+ + +
+
+ + {{template "views/partials/footer" .}} +
+ + + diff --git a/core/http/views/partials/navbar.html b/core/http/views/partials/navbar.html index caa1f3b77c9f..85ac0a0a3ad8 100644 --- a/core/http/views/partials/navbar.html +++ b/core/http/views/partials/navbar.html @@ -21,6 +21,9 @@ Generate images TTS Talk + {{ if .IsP2PEnabled }} + P2P + {{ end }} API
@@ -34,6 +37,9 @@ Generate images TTS Talk + {{ if .IsP2PEnabled }} + P2P + {{ end }} API
diff --git a/core/http/views/talk.html b/core/http/views/talk.html index d0caedaba6ad..afb494e953a5 100644 --- a/core/http/views/talk.html +++ b/core/http/views/talk.html @@ -10,7 +10,7 @@
- {{template "views/partials/navbar"}} + {{template "views/partials/navbar" .}}
diff --git a/core/p2p/node.go b/core/p2p/node.go new file mode 100644 index 000000000000..69808483388b --- /dev/null +++ b/core/p2p/node.go @@ -0,0 +1,38 @@ +package p2p + +import ( + "sync" + "time" +) + +type NodeData struct { + Name string + ID string + TunnelAddress string + LastSeen time.Time +} + +func (d NodeData) IsOnline() bool { + now := time.Now() + // if the node was seen in the last 40 seconds, it's online + return now.Sub(d.LastSeen) < 40*time.Second +} + +var mu sync.Mutex +var nodes = map[string]NodeData{} + +func GetAvailableNodes() []NodeData { + mu.Lock() + defer mu.Unlock() + var availableNodes = []NodeData{} + for _, v := range nodes { + availableNodes = append(availableNodes, v) + } + return availableNodes +} + +func AddNode(node NodeData) { + mu.Lock() + defer mu.Unlock() + nodes[node.ID] = node +} diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index 67196c977495..6def70edad60 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -11,18 +11,18 @@ import ( "net" "os" "strings" + "sync" "time" + "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/peer" "github.com/mudler/LocalAI/pkg/utils" + "github.com/mudler/edgevpn/pkg/config" "github.com/mudler/edgevpn/pkg/node" "github.com/mudler/edgevpn/pkg/protocol" + "github.com/mudler/edgevpn/pkg/services" "github.com/mudler/edgevpn/pkg/types" "github.com/phayes/freeport" - - "github.com/ipfs/go-log" - "github.com/mudler/edgevpn/pkg/config" - "github.com/mudler/edgevpn/pkg/services" zlog "github.com/rs/zerolog/log" "github.com/mudler/edgevpn/pkg/logger" @@ -34,6 +34,15 @@ func GenerateToken() string { return newData.Base64() } +func IsP2PEnabled() bool { + return true +} + +func nodeID() string { + hostname, _ := os.Hostname() + return hostname +} + func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error { zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) @@ -53,16 +62,16 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv 10*time.Second, func() { // Retrieve current ID for ip in the blockchain - _, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) + //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) // If mismatch, update the blockchain - if !found { - updatedMap := map[string]interface{}{} - updatedMap[node.Host().ID().String()] = &types.User{ - PeerID: node.Host().ID().String(), - Timestamp: time.Now().String(), - } - ledger.Add(protocol.UsersLedgerKey, updatedMap) + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[node.Host().ID().String()] = &types.User{ + PeerID: node.Host().ID().String(), + Timestamp: time.Now().String(), } + ledger.Add(protocol.UsersLedgerKey, updatedMap) + // } }, ) @@ -142,19 +151,32 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error { if err != nil { return err } - + // TODO: discoveryTunnels should return all the nodes that are available? + // In this way we updated availableNodes here instead of appending + // e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels + // each time the node is seen + // In this case the below function should be idempotent and just keep track of the nodes go func() { - totalTunnels := []string{} for { select { case <-ctx.Done(): zlog.Error().Msg("Discoverer stopped") return case tunnel := <-tunnels: + AddNode(tunnel) - totalTunnels = append(totalTunnels, tunnel) - os.Setenv("LLAMACPP_GRPC_SERVERS", strings.Join(totalTunnels, ",")) - zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", strings.Join(totalTunnels, ",")) + var tunnelAddresses []string + for _, v := range nodes { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } + } + tunnelEnvVar := strings.Join(tunnelAddresses, ",") + + os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar) + zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar) + + zlog.Info().Msgf("Node %s available", tunnel.ID) } } }() @@ -162,8 +184,8 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error { return nil } -func discoveryTunnels(ctx context.Context, token string) (chan string, error) { - tunnels := make(chan string) +func discoveryTunnels(ctx context.Context, token string) (chan NodeData, error) { + tunnels := make(chan NodeData) nodeOpts, err := newNodeOpts(token) if err != nil { @@ -184,8 +206,14 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) { } // get new services, allocate and return to the channel + + // TODO: + // a function ensureServices that: + // - starts a service if not started, if the worker is Online + // - checks that workers are Online, if not cancel the context of allocateLocalService + // - discoveryTunnels should return all the nodes and addresses associated with it + // - the caller should take now care of the fact that we are always returning fresh informations go func() { - emitted := map[string]bool{} for { select { case <-ctx.Done(): @@ -196,19 +224,15 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) { zlog.Debug().Msg("Searching for workers") data := ledger.LastBlock().Storage["services_localai"] - for k := range data { + for k, v := range data { zlog.Info().Msgf("Found worker %s", k) - if _, found := emitted[k]; !found { - emitted[k] = true - //discoveredPeers <- k - port, err := freeport.GetFreePort() - if err != nil { - fmt.Print(err) - } - tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) - go allocateLocalService(ctx, n, tunnelAddress, k) - tunnels <- tunnelAddress + nd := &NodeData{} + if err := v.Unmarshal(nd); err != nil { + zlog.Error().Msg("cannot unmarshal node data") + continue } + ensureService(ctx, n, nd, k) + tunnels <- *nd } } } @@ -217,6 +241,41 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) { return tunnels, err } +type nodeServiceData struct { + NodeData NodeData + CancelFunc context.CancelFunc +} + +var service = map[string]nodeServiceData{} +var muservice sync.Mutex + +func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) { + muservice.Lock() + defer muservice.Unlock() + if ndService, found := service[nd.Name]; !found { + newCtxm, cancel := context.WithCancel(ctx) + service[nd.Name] = nodeServiceData{ + NodeData: *nd, + CancelFunc: cancel, + } + // Start the service + port, err := freeport.GetFreePort() + if err != nil { + fmt.Print(err) + } + tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) + nd.TunnelAddress = tunnelAddress + go allocateLocalService(newCtxm, n, tunnelAddress, sserv) + } else { + // Check if the service is still alive + // if not cancel the context + if !ndService.NodeData.IsOnline() { + ndService.CancelFunc() + delete(service, nd.Name) + } + } +} + // This is the P2P worker main func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error { llger := logger.New(log.LevelFatal) @@ -248,16 +307,20 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error { ledger.Announce( ctx, - 10*time.Second, + 20*time.Second, func() { // Retrieve current ID for ip in the blockchain - _, found := ledger.GetKey("services_localai", name) + //_, found := ledger.GetKey("services_localai", name) // If mismatch, update the blockchain - if !found { - updatedMap := map[string]interface{}{} - updatedMap[name] = "p2p" - ledger.Add("services_localai", updatedMap) + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[name] = &NodeData{ + Name: name, + LastSeen: time.Now(), + ID: nodeID(), } + ledger.Add("services_localai", updatedMap) + // } }, ) diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index 59314aa629f0..169b8ff5f6f3 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -19,3 +19,11 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error { func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error { return fmt.Errorf("not implemented") } + +func GetAvailableNodes() []NodeData { + return []NodeData{} +} + +func IsP2PEnabled() bool { + return false +} diff --git a/docs/static/install.sh b/docs/static/install.sh index 4efdc7578f82..61be710bc082 100644 --- a/docs/static/install.sh +++ b/docs/static/install.sh @@ -76,6 +76,8 @@ DOCKER_INSTALL=${DOCKER_INSTALL:-$docker_found} USE_AIO=${USE_AIO:-false} API_KEY=${API_KEY:-} CORE_IMAGES=${CORE_IMAGES:-false} +P2P_TOKEN=${P2P_TOKEN:-} +WORKER=${WORKER:-false} # nprocs -1 if available nproc; then procs=$(nproc) @@ -132,7 +134,14 @@ configure_systemd() { info "Adding current user to local-ai group..." $SUDO usermod -a -G local-ai $(whoami) - + STARTCOMMAND="run" + if [ "$WORKER" = true ]; then + if [ -n "$P2P_TOKEN" ]; then + STARTCOMMAND="worker p2p-llama-cpp-rpc" + else + STARTCOMMAND="worker llama-cpp-rpc" + fi + fi info "Creating local-ai systemd service..." cat </dev/null [Unit] @@ -140,7 +149,7 @@ Description=LocalAI Service After=network-online.target [Service] -ExecStart=$BINDIR/local-ai run +ExecStart=$BINDIR/local-ai $STARTCOMMAND User=local-ai Group=local-ai Restart=always @@ -159,6 +168,11 @@ EOF $SUDO echo "THREADS=$THREADS" | $SUDO tee -a /etc/localai.env >/dev/null $SUDO echo "MODELS_PATH=$MODELS_PATH" | $SUDO tee -a /etc/localai.env >/dev/null + if [ -n "$P2P_TOKEN" ]; then + $SUDO echo "LOCALAI_P2P_TOKEN=$P2P_TOKEN" | $SUDO tee -a /etc/localai.env >/dev/null + $SUDO echo "LOCALAI_P2P=true" | $SUDO tee -a /etc/localai.env >/dev/null + fi + SYSTEMCTL_RUNNING="$(systemctl is-system-running || true)" case $SYSTEMCTL_RUNNING in running|degraded) @@ -407,6 +421,19 @@ install_docker() { # exit 0 fi + STARTCOMMAND="run" + if [ "$WORKER" = true ]; then + if [ -n "$P2P_TOKEN" ]; then + STARTCOMMAND="worker p2p-llama-cpp-rpc" + else + STARTCOMMAND="worker llama-cpp-rpc" + fi + fi + envs="" + if [ -n "$P2P_TOKEN" ]; then + envs="-e LOCALAI_P2P_TOKEN=$P2P_TOKEN -e LOCALAI_P2P=true" + fi + IMAGE_TAG= if [ "$HAS_CUDA" ]; then IMAGE_TAG=${VERSION}-cublas-cuda12-ffmpeg @@ -430,7 +457,8 @@ install_docker() { --restart=always \ -e API_KEY=$API_KEY \ -e THREADS=$THREADS \ - -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG + $envs \ + -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG $STARTCOMMAND elif [ "$HAS_AMD" ]; then IMAGE_TAG=${VERSION}-hipblas-ffmpeg # CORE @@ -448,7 +476,8 @@ install_docker() { --restart=always \ -e API_KEY=$API_KEY \ -e THREADS=$THREADS \ - -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG + $envs \ + -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG $STARTCOMMAND elif [ "$HAS_INTEL" ]; then IMAGE_TAG=${VERSION}-sycl-f32-ffmpeg # CORE @@ -465,7 +494,8 @@ install_docker() { --restart=always \ -e API_KEY=$API_KEY \ -e THREADS=$THREADS \ - -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG + $envs \ + -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG $STARTCOMMAND else IMAGE_TAG=${VERSION}-ffmpeg # CORE @@ -481,7 +511,8 @@ install_docker() { -e MODELS_PATH=/models \ -e API_KEY=$API_KEY \ -e THREADS=$THREADS \ - -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG + $envs \ + -d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG $STARTCOMMAND fi install_success