Skip to content

Commit

Permalink
Chore: use SSE to deliver change event for knowledgefiles (#1106)
Browse files Browse the repository at this point in the history
* Chore: use SSE to deliver change event for knowledgefiles

Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey authored Jan 6, 2025
1 parent 080815f commit dd853d9
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 27 deletions.
68 changes: 68 additions & 0 deletions pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -15,9 +16,11 @@ import (
"github.com/obot-platform/obot/pkg/invoke"
"github.com/obot-platform/obot/pkg/render"
v1 "github.com/obot-platform/obot/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/obot-platform/obot/pkg/storage/selectors"
"github.com/obot-platform/obot/pkg/system"
"github.com/obot-platform/obot/pkg/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -844,6 +847,71 @@ func (a *AgentHandler) Script(req api.Context) error {
return req.Write(script)
}

func (a *AgentHandler) WatchKnowledgeFile(req api.Context) error {
knowledgeSetNames, agentName, err := a.getKnowledgeSetsAndName(req, req.PathValue("agent_id"))
if err != nil {
return err
}

if len(knowledgeSetNames) == 0 {
return req.Write(types.KnowledgeFileList{Items: []types.KnowledgeFile{}})
}

knowledgeSourceName := req.PathValue("knowledge_source_id")
var knowledgeSource *v1.KnowledgeSource
if knowledgeSourceName != "" {
knowledgeSource = &v1.KnowledgeSource{}
if err := req.Get(knowledgeSource, knowledgeSourceName); err != nil {
return err
}
if knowledgeSource.Spec.KnowledgeSetName != knowledgeSetNames[0] {
return types.NewErrBadRequest("knowledgeSource %q does not belong to agent %q", knowledgeSource.Name, agentName)
}
}

w, err := req.Storage.Watch(req.Context(), &v1.KnowledgeFileList{}, kclient.InNamespace(req.Namespace()),
&kclient.ListOptions{
FieldSelector: fields.SelectorFromSet(selectors.RemoveEmpty(map[string]string{
"spec.knowledgeSetName": knowledgeSetNames[0],
"spec.knowledgeSourceName": knowledgeSourceName,
})),
})
if err != nil {
return err
}
defer func() {
w.Stop()
//nolint:revive
for range w.ResultChan() {
}
}()

req.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
defer func() {
_ = req.WriteDataEvent(api.EventClose{})
}()

for event := range w.ResultChan() {
if knowledgeFile, ok := event.Object.(*v1.KnowledgeFile); ok {
payload := map[string]any{
"eventType": event.Type,
"knowledgeFile": convertKnowledgeFile(agentName, "", *knowledgeFile),
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
sseEvent := fmt.Sprintf("data: %s\n\n", data)
if _, err := req.ResponseWriter.Write([]byte(sseEvent)); err != nil {
return err
}
req.Flush()
}
}

return nil
}

func MetadataFrom(obj kclient.Object, linkKV ...string) types.Metadata {
m := types.Metadata{
ID: obj.GetName(),
Expand Down
1 change: 1 addition & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("DELETE /api/agents/{agent_id}/knowledge-sources/{id}", agents.DeleteKnowledgeSource)
mux.HandleFunc("GET /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files", agents.ListKnowledgeFiles)
mux.HandleFunc("POST /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files/{file_id}/ingest", agents.ReIngestKnowledgeFile)
mux.HandleFunc("GET /api/agents/{agent_id}/knowledge-sources/{knowledge_source_id}/knowledge-files/watch", agents.WatchKnowledgeFile)

// Workflows
mux.HandleFunc("GET /api/workflows", workflows.List)
Expand Down
3 changes: 0 additions & 3 deletions ui/admin/app/components/knowledge/FileTree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ export default function FileTreeNode({
filePathPrefixInclude,
filePathPrefixExclude,
});

// We should manually approve/unapprove all files in the folder at once so that we don't rely on backend reconciliation logic as it will cause delay in updating the UI.
onApproveFile(!included, node);
};

return (
Expand Down
75 changes: 51 additions & 24 deletions ui/admin/app/hooks/knowledge/useKnowledgeSourceFiles.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { useEffect, useMemo, useState } from "react";
import useSWR from "swr";

import {
KnowledgeFile,
KnowledgeFileEvent,
KnowledgeFileState,
KnowledgeSource,
KnowledgeSourceNamespace,
Expand All @@ -29,27 +29,56 @@ export function useKnowledgeSourceFiles(
startPolling();
}

const {
data: files,
mutate: mutateFiles,
...rest
} = useSWR(
KnowledgeSourceApiService.getFilesForKnowledgeSource.key(
namespace,
agentId,
knowledgeSource.id
),
({ agentId, sourceId }) =>
KnowledgeSourceApiService.getFilesForKnowledgeSource(
const [files, setFiles] = useState<KnowledgeFile[]>([]);

useEffect(() => {
const eventSource =
KnowledgeSourceApiService.getKnowledgeSourceFilesEventSource(
namespace,
agentId,
sourceId
),
{
revalidateOnFocus: false,
refreshInterval: blockPollingFiles ? undefined : 5000,
}
);
knowledgeSource.id
);

eventSource.onmessage = (event) => {
const { eventType, knowledgeFile } = JSON.parse(
event.data
) as KnowledgeFileEvent;

setFiles((prevFiles) => {
let updatedFiles = [...prevFiles];
switch (eventType) {
case "ADDED":
case "MODIFIED":
{
const existingIndex = updatedFiles.findIndex(
(file) => file.id === knowledgeFile.id
);
if (existingIndex !== -1) {
updatedFiles[existingIndex] = knowledgeFile;
} else {
updatedFiles.push(knowledgeFile);
}
}
break;
case "DELETED":
{
updatedFiles = updatedFiles.filter(
(file) => file.id !== knowledgeFile.id
);
}
break;
default:
break;
}
return updatedFiles;
});
};

return () => {
setFiles([]);
eventSource.close();
};
}, [knowledgeSource.id, namespace, agentId]);

const sortedFiles = useMemo(() => {
return (
Expand Down Expand Up @@ -90,7 +119,7 @@ export function useKnowledgeSourceFiles(
knowledgeSource.id,
fileId
);
mutateFiles((prev) =>
setFiles((prev) =>
prev?.map((f) => (f.id === fileId ? updatedFile : f))
);
};
Expand All @@ -109,7 +138,7 @@ export function useKnowledgeSourceFiles(
console.error("Failed to approve file", error);
}

mutateFiles((prev) =>
setFiles((prev) =>
prev?.map((f) => (f.id === file.id ? (updatedFile ?? file) : f))
);
};
Expand All @@ -118,8 +147,6 @@ export function useKnowledgeSourceFiles(
files: sortedFiles,
reingestFile,
approveFile,
mutateFiles,
startPollingFiles: startPolling,
...rest,
};
}
5 changes: 5 additions & 0 deletions ui/admin/app/lib/model/knowledge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ export type KnowledgeFile = {
sizeInBytes?: number;
};

export type KnowledgeFileEvent = {
eventType: "ADDED" | "MODIFIED" | "DELETED";
knowledgeFile: KnowledgeFile;
};

export function getRemoteFileDisplayName(item: KnowledgeFile) {
return item.fileName;
}
Expand Down
8 changes: 8 additions & 0 deletions ui/admin/app/lib/routers/apiRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ export const ApiRoutes = {
entityId: string,
fileName: string
) => buildUrl(`/${namespace}/${entityId}/approve-file/${fileName}`),
watchKnowledgeSourceFiles: (
namespace: KnowledgeSourceNamespace,
entityId: string,
sourceId: string
) =>
buildUrl(
`/${namespace}/${entityId}/knowledge-sources/${sourceId}/knowledge-files/watch`
),
},
knowledgeFiles: {
getKnowledgeFiles: (
Expand Down
15 changes: 15 additions & 0 deletions ui/admin/app/lib/service/api/knowledgeSourceApiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,28 @@ async function deleteKnowledgeSource(
});
}

function getKnowledgeSourceFilesEventSource(
namespace: KnowledgeSourceNamespace,
agentId: string,
sourceId: string
) {
return new EventSource(
ApiRoutes.knowledgeSources.watchKnowledgeSourceFiles(
namespace,
agentId,
sourceId
).url
);
}

export const KnowledgeSourceApiService = {
approveFile,
createKnowledgeSource,
updateKnowledgeSource,
resyncKnowledgeSource,
getKnowledgeSources,
getFilesForKnowledgeSource,
getKnowledgeSourceFilesEventSource,
reingestFileFromSource,
deleteKnowledgeSource,
};

0 comments on commit dd853d9

Please sign in to comment.