Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: use SSE to deliver change event for knowledgefiles #1106

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -14,10 +15,13 @@ import (
"github.com/obot-platform/obot/pkg/api"
"github.com/obot-platform/obot/pkg/invoke"
"github.com/obot-platform/obot/pkg/render"
"github.com/obot-platform/obot/pkg/storage"
v1 "github.com/obot-platform/obot/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/obot-platform/obot/pkg/storage/selectors"
"github.com/obot-platform/obot/pkg/system"
"github.com/obot-platform/obot/pkg/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -27,14 +31,16 @@ type AgentHandler struct {
serverURL string
// This is currently a hack to access the workflow handler
workflowHandler *WorkflowHandler
client storage.Client
}

func NewAgentHandler(gClient *gptscript.GPTScript, invoker *invoke.Invoker, serverURL string) *AgentHandler {
func NewAgentHandler(gClient *gptscript.GPTScript, invoker *invoke.Invoker, serverURL string, client storage.Client) *AgentHandler {
return &AgentHandler{
serverURL: serverURL,
gptscript: gClient,
invoker: invoker,
workflowHandler: NewWorkflowHandler(gClient, serverURL, invoker),
client: client,
}
}

Expand Down Expand Up @@ -844,6 +850,71 @@ func (a *AgentHandler) Script(req api.Context) error {
return req.Write(script)
}

func (a *AgentHandler) WatchKnowledgeFile(req api.Context) error {
knowledgeSetNames, agentName, err := a.getKnowledgeSetsAndName(req, req.PathValue("agent_id"))
if err != nil {
return err
}

if len(knowledgeSetNames) == 0 {
return req.Write(types.KnowledgeFileList{Items: []types.KnowledgeFile{}})
}

knowledgeSourceName := req.PathValue("knowledge_source_id")
var knowledgeSource *v1.KnowledgeSource
if knowledgeSourceName != "" {
knowledgeSource = &v1.KnowledgeSource{}
if err := req.Get(knowledgeSource, knowledgeSourceName); err != nil {
return err
}
if knowledgeSource.Spec.KnowledgeSetName != knowledgeSetNames[0] {
return types.NewErrBadRequest("knowledgeSource %q does not belong to agent %q", knowledgeSource.Name, agentName)
}
}

w, err := a.client.Watch(req.Context(), &v1.KnowledgeFileList{}, kclient.InNamespace(req.Namespace()),
&kclient.ListOptions{
FieldSelector: fields.SelectorFromSet(selectors.RemoveEmpty(map[string]string{
"spec.knowledgeSetName": knowledgeSetNames[0],
"spec.knowledgeSourceName": knowledgeSourceName,
})),
})
if err != nil {
return err
}
defer func() {
w.Stop()
//nolint:revive
for range w.ResultChan() {
}
}()

req.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
defer func() {
_ = req.WriteDataEvent(api.EventClose{})
}()

for event := range w.ResultChan() {
if knowledgeFile, ok := event.Object.(*v1.KnowledgeFile); ok {
payload := map[string]interface{}{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
payload := map[string]interface{}{
payload := map[string]any{

"eventType": event.Type,
"knowledgeFile": convertKnowledgeFile(agentName, "", *knowledgeFile),
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
sseEvent := fmt.Sprintf("data: %s\n\n", data)
if _, err := req.ResponseWriter.Write([]byte(sseEvent)); err != nil {
return err
}
req.Flush()
}
}

return nil
}

func MetadataFrom(obj kclient.Object, linkKV ...string) types.Metadata {
m := types.Metadata{
ID: obj.GetName(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func Router(services *services.Services) (http.Handler, error) {
mux := services.APIServer

agents := handlers.NewAgentHandler(services.GPTClient, services.Invoker, services.ServerURL)
agents := handlers.NewAgentHandler(services.GPTClient, services.Invoker, services.ServerURL, services.StorageClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The api.Context object passed to each of these handlers already has a client. Can we just use that one instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! addressed

assistants := handlers.NewAssistantHandler(services.Invoker, services.Events, services.GPTClient)
tools := handlers.NewToolHandler(services.GPTClient)
tasks := handlers.NewTaskHandler(services.Invoker, services.Events)
Expand Down Expand Up @@ -141,6 +141,7 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("DELETE /api/agents/{agent_id}/knowledge-sources/{id}", agents.DeleteKnowledgeSource)
mux.HandleFunc("GET /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files", agents.ListKnowledgeFiles)
mux.HandleFunc("POST /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files/{file_id}/ingest", agents.ReIngestKnowledgeFile)
mux.HandleFunc("GET /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files/watch", agents.WatchKnowledgeFile)

// Workflows
mux.HandleFunc("GET /api/workflows", workflows.List)
Expand Down
3 changes: 0 additions & 3 deletions ui/admin/app/components/knowledge/FileTree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ export default function FileTreeNode({
filePathPrefixInclude,
filePathPrefixExclude,
});

// We should manually approve/unapprove all files in the folder at once so that we don't rely on backend reconciliation logic as it will cause delay in updating the UI.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not ideal to approve files in frontend just for fast feedback from UI. I think it is ok to just rely on backend to reconcile all the files to be approved.

onApproveFile(!included, node);
};

return (
Expand Down
75 changes: 51 additions & 24 deletions ui/admin/app/hooks/knowledge/useKnowledgeSourceFiles.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { useEffect, useMemo, useState } from "react";
import useSWR from "swr";

import {
KnowledgeFile,
KnowledgeFileEvent,
KnowledgeFileState,
KnowledgeSource,
KnowledgeSourceNamespace,
Expand All @@ -29,27 +29,56 @@ export function useKnowledgeSourceFiles(
startPolling();
}

const {
data: files,
mutate: mutateFiles,
...rest
} = useSWR(
KnowledgeSourceApiService.getFilesForKnowledgeSource.key(
namespace,
agentId,
knowledgeSource.id
),
({ agentId, sourceId }) =>
KnowledgeSourceApiService.getFilesForKnowledgeSource(
const [files, setFiles] = useState<KnowledgeFile[]>([]);

useEffect(() => {
const eventSource =
KnowledgeSourceApiService.getKnowledgeSourceFilesEventSource(
namespace,
agentId,
sourceId
),
{
revalidateOnFocus: false,
refreshInterval: blockPollingFiles ? undefined : 5000,
}
);
knowledgeSource.id
);

eventSource.onmessage = (event) => {
const { eventType, knowledgeFile } = JSON.parse(
event.data
) as KnowledgeFileEvent;

setFiles((prevFiles) => {
let updatedFiles = [...prevFiles];
switch (eventType) {
case "ADDED":
case "MODIFIED":
{
const existingIndex = updatedFiles.findIndex(
(file) => file.id === knowledgeFile.id
);
if (existingIndex !== -1) {
updatedFiles[existingIndex] = knowledgeFile;
} else {
updatedFiles.push(knowledgeFile);
}
}
break;
case "DELETED":
{
updatedFiles = updatedFiles.filter(
(file) => file.id !== knowledgeFile.id
);
}
break;
default:
break;
}
return updatedFiles;
});
};

return () => {
setFiles([]);
eventSource.close();
};
}, [knowledgeSource.id, namespace, agentId]);

const sortedFiles = useMemo(() => {
return (
Expand Down Expand Up @@ -90,7 +119,7 @@ export function useKnowledgeSourceFiles(
knowledgeSource.id,
fileId
);
mutateFiles((prev) =>
setFiles((prev) =>
prev?.map((f) => (f.id === fileId ? updatedFile : f))
);
};
Expand All @@ -109,7 +138,7 @@ export function useKnowledgeSourceFiles(
console.error("Failed to approve file", error);
}

mutateFiles((prev) =>
setFiles((prev) =>
prev?.map((f) => (f.id === file.id ? (updatedFile ?? file) : f))
);
};
Expand All @@ -118,8 +147,6 @@ export function useKnowledgeSourceFiles(
files: sortedFiles,
reingestFile,
approveFile,
mutateFiles,
startPollingFiles: startPolling,
...rest,
};
}
5 changes: 5 additions & 0 deletions ui/admin/app/lib/model/knowledge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ export type KnowledgeFile = {
sizeInBytes?: number;
};

export type KnowledgeFileEvent = {
eventType: "ADDED" | "MODIFIED" | "DELETED";
knowledgeFile: KnowledgeFile;
};

export function getRemoteFileDisplayName(item: KnowledgeFile) {
return item.fileName;
}
Expand Down
8 changes: 8 additions & 0 deletions ui/admin/app/lib/routers/apiRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ export const ApiRoutes = {
entityId: string,
fileName: string
) => buildUrl(`/${namespace}/${entityId}/approve-file/${fileName}`),
watchKnowledgeSourceFiles: (
namespace: KnowledgeSourceNamespace,
entityId: string,
sourceId: string
) =>
buildUrl(
`/${namespace}/${entityId}/knowledge-sources/${sourceId}/knowledge-files/watch`
),
},
knowledgeFiles: {
getKnowledgeFiles: (
Expand Down
15 changes: 15 additions & 0 deletions ui/admin/app/lib/service/api/knowledgeSourceApiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,28 @@ async function deleteKnowledgeSource(
});
}

function getKnowledgeSourceFilesEventSource(
namespace: KnowledgeSourceNamespace,
agentId: string,
sourceId: string
) {
return new EventSource(
ApiRoutes.knowledgeSources.watchKnowledgeSourceFiles(
namespace,
agentId,
sourceId
).url
);
}

export const KnowledgeSourceApiService = {
approveFile,
createKnowledgeSource,
updateKnowledgeSource,
resyncKnowledgeSource,
getKnowledgeSources,
getFilesForKnowledgeSource,
getKnowledgeSourceFilesEventSource,
reingestFileFromSource,
deleteKnowledgeSource,
};