Skip to content

Commit

Permalink
feat(llama.cpp): Enable decentralized, distributed inference
Browse files Browse the repository at this point in the history
As #2324 introduced distributed inferencing thanks to
@rgerganov implementation in ggerganov/llama.cpp#6829 in upstream llama.cpp, now
it is possible to distribute the workload to remote llama.cpp gRPC server.

This changeset now uses mudler/edgevpn to establish a secure, distributed network between the nodes using a shared token.
The token is generated automatically when starting the server with the `--p2p` flag, and can be used by starting the workers
with `local-ai worker p2p-llama-cpp-rpc` by passing the token via environment variable (TOKEN) or with args (--token).

As per how mudler/edgevpn works, a network is established between the server and the workers with dht and mdns discovery protocols,
the llama.cpp rpc server is automatically started and exposed to the underlying p2p network so the API server can connect on.

When the HTTP server is started, it will discover the workers in the network and automatically create the port-forwards to the service locally.
Then llama.cpp is configured to use the services.

This feature is behind the "p2p" GO_FLAGS

Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler committed May 18, 2024
1 parent 5f35e85 commit ea9a3cb
Show file tree
Hide file tree
Showing 15 changed files with 1,155 additions and 43 deletions.
25 changes: 10 additions & 15 deletions core/cli/cli.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package cli

import "embed"

type Context struct {
Debug bool `env:"LOCALAI_DEBUG,DEBUG" default:"false" hidden:"" help:"DEPRECATED, use --log-level=debug instead. Enable debug logging"`
LogLevel *string `env:"LOCALAI_LOG_LEVEL" enum:"error,warn,info,debug,trace" help:"Set the level of logs to output [${enum}]"`

// This field is not a command line argument/flag, the struct tag excludes it from the parsed CLI
BackendAssets embed.FS `kong:"-"`
}
import (
cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/core/cli/worker"
)

var CLI struct {
Context `embed:""`
cliContext.Context `embed:""`

Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"`
Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"`
TTS TTSCMD `cmd:"" help:"Convert text to speech"`
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
LLAMACPPWorker LLAMACPPWorkerCMD `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"`
Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"`
TTS TTSCMD `cmd:"" help:"Convert text to speech"`
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
}
11 changes: 11 additions & 0 deletions core/cli/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cliContext

import "embed"

type Context struct {
Debug bool `env:"LOCALAI_DEBUG,DEBUG" default:"false" hidden:"" help:"DEPRECATED, use --log-level=debug instead. Enable debug logging"`
LogLevel *string `env:"LOCALAI_LOG_LEVEL" enum:"error,warn,info,debug,trace" help:"Set the level of logs to output [${enum}]"`

// This field is not a command line argument/flag, the struct tag excludes it from the parsed CLI
BackendAssets embed.FS `kong:"-"`
}
6 changes: 4 additions & 2 deletions core/cli/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"

cliContext "github.com/go-skynet/LocalAI/core/cli/context"

"github.com/go-skynet/LocalAI/pkg/gallery"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
Expand All @@ -29,7 +31,7 @@ type ModelsCMD struct {
Install ModelsInstall `cmd:"" help:"Install a model from the gallery"`
}

func (ml *ModelsList) Run(ctx *Context) error {
func (ml *ModelsList) Run(ctx *cliContext.Context) error {
var galleries []gallery.Gallery
if err := json.Unmarshal([]byte(ml.Galleries), &galleries); err != nil {
log.Error().Err(err).Msg("unable to load galleries")
Expand All @@ -49,7 +51,7 @@ func (ml *ModelsList) Run(ctx *Context) error {
return nil
}

func (mi *ModelsInstall) Run(ctx *Context) error {
func (mi *ModelsInstall) Run(ctx *cliContext.Context) error {
modelName := mi.ModelArgs[0]

var galleries []gallery.Gallery
Expand Down
41 changes: 33 additions & 8 deletions core/cli/run.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package cli

import (
"context"
"fmt"
"strings"
"time"

cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/core/http"
"github.com/go-skynet/LocalAI/core/p2p"
"github.com/go-skynet/LocalAI/core/startup"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -37,13 +40,14 @@ type RunCMD struct {
Threads int `env:"LOCALAI_THREADS,THREADS" short:"t" default:"4" help:"Number of threads used for parallel computation. Usage of the number of physical cores in the system is suggested" group:"performance"`
ContextSize int `env:"LOCALAI_CONTEXT_SIZE,CONTEXT_SIZE" default:"512" help:"Default context size for models" group:"performance"`

Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
CORS bool `env:"LOCALAI_CORS,CORS" help:"" group:"api"`
CORSAllowOrigins string `env:"LOCALAI_CORS_ALLOW_ORIGINS,CORS_ALLOW_ORIGINS" group:"api"`
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-limit in MB" group:"api"`
APIKeys []string `env:"LOCALAI_API_KEY,API_KEY" help:"List of API Keys to enable API authentication. When this is set, all the requests must be authenticated with one of these API keys" group:"api"`
DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disable webui" group:"api"`

Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
CORS bool `env:"LOCALAI_CORS,CORS" help:"" group:"api"`
CORSAllowOrigins string `env:"LOCALAI_CORS_ALLOW_ORIGINS,CORS_ALLOW_ORIGINS" group:"api"`
UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-limit in MB" group:"api"`
APIKeys []string `env:"LOCALAI_API_KEY,API_KEY" help:"List of API Keys to enable API authentication. When this is set, all the requests must be authenticated with one of these API keys" group:"api"`
DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disable webui" group:"api"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
Expand All @@ -54,7 +58,7 @@ type RunCMD struct {
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
}

func (r *RunCMD) Run(ctx *Context) error {
func (r *RunCMD) Run(ctx *cliContext.Context) error {
opts := []config.AppOption{
config.WithConfigFile(r.ModelsConfigFile),
config.WithJSONStringPreload(r.PreloadModels),
Expand All @@ -81,6 +85,27 @@ func (r *RunCMD) Run(ctx *Context) error {
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
}

if r.Peer2Peer || r.Peer2PeerToken != "" {
log.Info().Msg("P2P mode enabled")
token := r.Peer2PeerToken
if token == "" {
// IF no token is provided, and p2p is enabled,
// we generate one and wait for the user to pick up the token (this is for interactive)
log.Info().Msg("No token provided, generating one")
token = p2p.GenerateToken()
log.Info().Msg("Generated Token:")
fmt.Println(token)
// Ask for user confirmation
log.Info().Msg("Press a button to proceed")
var input string
fmt.Scanln(&input)
}
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.LLamaCPPRPCServerDiscoverer(context.Background(), token); err != nil {
return err
}
}

idleWatchDog := r.EnableWatchdogIdle
busyWatchDog := r.EnableWatchdogBusy

Expand Down
3 changes: 2 additions & 1 deletion core/cli/transcript.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/go-skynet/LocalAI/core/backend"
cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/pkg/model"
"github.com/rs/zerolog/log"
Expand All @@ -22,7 +23,7 @@ type TranscriptCMD struct {
BackendAssetsPath string `env:"LOCALAI_BACKEND_ASSETS_PATH,BACKEND_ASSETS_PATH" type:"path" default:"/tmp/localai/backend_data" help:"Path used to extract libraries that are required by some of the backends in runtime" group:"storage"`
}

func (t *TranscriptCMD) Run(ctx *Context) error {
func (t *TranscriptCMD) Run(ctx *cliContext.Context) error {
opts := &config.ApplicationConfig{
ModelPath: t.ModelsPath,
Context: context.Background(),
Expand Down
3 changes: 2 additions & 1 deletion core/cli/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/go-skynet/LocalAI/core/backend"
cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/core/config"
"github.com/go-skynet/LocalAI/pkg/model"
"github.com/rs/zerolog/log"
Expand All @@ -24,7 +25,7 @@ type TTSCMD struct {
BackendAssetsPath string `env:"LOCALAI_BACKEND_ASSETS_PATH,BACKEND_ASSETS_PATH" type:"path" default:"/tmp/localai/backend_data" help:"Path used to extract libraries that are required by some of the backends in runtime" group:"storage"`
}

func (t *TTSCMD) Run(ctx *Context) error {
func (t *TTSCMD) Run(ctx *cliContext.Context) error {
outputFile := t.OutputFile
outputDir := t.BackendAssetsPath
if outputFile != "" {
Expand Down
10 changes: 10 additions & 0 deletions core/cli/worker/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package worker

type WorkerFlags struct {
BackendAssetsPath string `env:"LOCALAI_BACKEND_ASSETS_PATH,BACKEND_ASSETS_PATH" type:"path" default:"/tmp/localai/backend_data" help:"Path used to extract libraries that are required by some of the backends in runtime" group:"storage"`
}

type Worker struct {
P2P P2P `cmd:"" name:"p2p-llama-cpp-rpc" help:"Starts a LocalAI llama.cpp worker in P2P mode (requires a token)"`
LLamaCPP LLamaCPP `cmd:"" name:"llama-cpp-rpc" help:"Starts a llama.cpp worker in standalone mode"`
}
18 changes: 12 additions & 6 deletions core/cli/llamacppworker.go → core/cli/worker/worker_llamacpp.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
package cli
package worker

import (
"fmt"
"os"
"syscall"

cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/pkg/assets"
"github.com/rs/zerolog/log"
)

type LLAMACPPWorkerCMD struct {
Args []string `arg:"" optional:"" name:"models" help:"Worker arguments: host port"`
BackendAssetsPath string `env:"LOCALAI_BACKEND_ASSETS_PATH,BACKEND_ASSETS_PATH" type:"path" default:"/tmp/localai/backend_data" help:"Path used to extract libraries that are required by some of the backends in runtime" group:"storage"`
type LLamaCPP struct {
Args []string `arg:"" optional:"" name:"models" help:"Model configuration URLs to load"`
WorkerFlags `embed:""`
}

func (r *LLAMACPPWorkerCMD) Run(ctx *Context) error {
func (r *LLamaCPP) Run(ctx *cliContext.Context) error {
// Extract files from the embedded FS
err := assets.ExtractFiles(ctx.BackendAssets, r.BackendAssetsPath)
log.Debug().Msgf("Extracting backend assets files to %s", r.BackendAssetsPath)
if err != nil {
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly, like gpt4all)", err)
}

if len(os.Args) < 4 {
return fmt.Errorf("usage: local-ai worker llama-cpp-rpc -- <llama-rpc-server-args>")
}

return syscall.Exec(
assets.ResolvePath(
r.BackendAssetsPath,
Expand All @@ -32,6 +38,6 @@ func (r *LLAMACPPWorkerCMD) Run(ctx *Context) error {
r.BackendAssetsPath,
"util",
"llama-cpp-rpc-server",
)}, r.Args...),
)}, os.Args[4:]...),
os.Environ())
}
16 changes: 16 additions & 0 deletions core/cli/worker/worker_nop2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build !p2p
// +build !p2p

package worker

import (
"fmt"

cliContext "github.com/go-skynet/LocalAI/core/cli/context"
)

type P2P struct{}

func (r *P2P) Run(ctx *cliContext.Context) error {
return fmt.Errorf("p2p mode is not enabled in this build")
}
104 changes: 104 additions & 0 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//go:build p2p
// +build p2p

package worker

import (
"context"
"fmt"
"os"
"os/exec"
"time"

cliContext "github.com/go-skynet/LocalAI/core/cli/context"
"github.com/go-skynet/LocalAI/core/p2p"
"github.com/go-skynet/LocalAI/pkg/assets"
"github.com/phayes/freeport"
"github.com/rs/zerolog/log"
)

type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
}

func (r *P2P) Run(ctx *cliContext.Context) error {
// Extract files from the embedded FS
err := assets.ExtractFiles(ctx.BackendAssets, r.BackendAssetsPath)
log.Debug().Msgf("Extracting backend assets files to %s", r.BackendAssetsPath)
if err != nil {
log.Warn().Msgf("Failed extracting backend assets files: %s (might be required for some backends to work properly, like gpt4all)", err)
}

// Check if the token is set
// as we always need it.
if r.Token == "" {
return fmt.Errorf("Token is required")
}

port, err := freeport.GetFreePort()
if err != nil {
return err
}

address := "127.0.0.1"

if r.NoRunner {
// Let override which port and address to bind if the user
// configure the llama-cpp service on its own
p := fmt.Sprint(port)
if r.RunnerAddress != "" {
address = r.RunnerAddress
}
if r.RunnerPort != "" {
p = r.RunnerPort
}

err = p2p.BindLLamaCPPWorker(context.Background(), address, p, r.Token)
if err != nil {
return err
}
log.Info().Msgf("You need to start llama-cpp-rpc-server on '%s:%s'", address, p)

return nil
}

// Start llama.cpp directly from the version we have pre-packaged
go func() {
for {
log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port)
cmd := exec.Command(
assets.ResolvePath(
r.BackendAssetsPath,
"util",
"llama-cpp-rpc-server",
),
append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...)...,
)

cmd.Env = os.Environ()

cmd.Stderr = os.Stdout
cmd.Stdout = os.Stdout

if err := cmd.Start(); err != nil {
log.Error().Err(err).Msg("Failed to start llama-cpp-rpc-server")
}

cmd.Wait()
}
}()

err = p2p.BindLLamaCPPWorker(context.Background(), address, fmt.Sprint(port), r.Token)
if err != nil {
return err
}

for {
time.Sleep(1 * time.Second)
}
}
Loading

0 comments on commit ea9a3cb

Please sign in to comment.