Skip to content

Commit

Permalink
fix(dashboard-v2): failed to render because API is changed (#2368)
Browse files Browse the repository at this point in the history
* fix: failed to render because of data renaming

* remove tmp info for debugging
  • Loading branch information
cloudcarver authored May 8, 2022
1 parent 9fd7119 commit 3e8fe1e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 18 deletions.
1 change: 0 additions & 1 deletion dashboard/components/StreamingView.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const PopupBox = styled("div")({
alignItems: "center",
backgroundColor: "white",
borderRadius: "20px",
boxShadow: "14px 14px 28px #e6e6e6, -14px -14px 28px #ffffff"
});

const PopupBoxHeader = styled("div")({
Expand Down
2 changes: 1 addition & 1 deletion dashboard/lib/graaphEngine/canvasEngine.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class Group extends DrawElement {
_appendText = () => {
return (content) => new Text({
...this.basicSetting, ...{
canvasElement: new fabric.Text(content, { selectable: false, textAlign: "justify-center" })
canvasElement: new fabric.Text(content || "undefined", { selectable: false, textAlign: "justify-center" })
}
});
}
Expand Down
40 changes: 27 additions & 13 deletions dashboard/lib/streamPlan/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ function generateNewNodeId() {
return "g" + (++cnt);
}

function getNodeId(nodeProto) {
return nodeProto.operatorId === undefined ? generateNewNodeId() : "o" + nodeProto.operatorId;
function getNodeId(nodeProto, actorId) {
return actorId + ":" + (nodeProto.operatorId === undefined ? generateNewNodeId() : "o" + nodeProto.operatorId);
}

class Node {
Expand Down Expand Up @@ -55,9 +55,9 @@ class StreamNode extends Node {
}

parseType(nodeProto) {
let typeReg = new RegExp('.+Node');
let types = new Set(["source", "project", "filter", "materialize", "localSimpleAgg", "globalSimpleAgg", "hashAgg", "appendOnlyTopN", "hashJoin", "topN", "hopWindow", "merge", "exchange", "chain", "batchPlan", "lookup", "arrange", "lookupUnion", "union", "deltaIndexJoin"]);
for (let [type, _] of Object.entries(nodeProto)) {
if (typeReg.test(type)) {
if (types.has(type)) {
return type;
}
}
Expand Down Expand Up @@ -144,15 +144,27 @@ export default class StreamPlanParser {
this.parsedActorList.push(actor);
}

/** @type {Set<Actor>} */
this.fragmentRepresentedActors = this._constructRepresentedActorList();

/**
* @type {Map<number, Array<number>}
*/
/** @type {Map<number, Array<number>} */
this.mvTableIdToSingleViewActorList = this._constructSingleViewMvList();

/** @type {Map<number, Array<number>} */
this.mvTableIdToChainViewActorList = this._constructChainViewMvList()
}


/**
* Randomly select a actor to represent its
* fragment, and append a property named `representedActorList`
* to store all the other actors in the same fragement.
*
* Actors are degree of parallelism of a fragment, such that one of
* the actor in a fragement can represent all the other actor in
* the same fragment.
*
* @returns A Set containing actors representing its fragment.
*/
_constructRepresentedActorList() {
const fragmentId2actorList = new Map();
let fragmentRepresentedActors = new Set();
Expand All @@ -164,10 +176,12 @@ export default class StreamPlanParser {
fragmentId2actorList.get(actor.fragmentId).push(actor);
}
}

for (let actor of fragmentRepresentedActors) {
actor.representedActorList = cloneDeep(fragmentId2actorList.get(actor.fragmentId)).sort(x => x.actorId);
actor.representedActorList = fragmentId2actorList.get(actor.fragmentId).sort(x => x.actorId);
actor.representedWorkNodes = new Set();
for (let representedActor of actor.representedActorList) {
representedActor.representedActorList = actor.representedActorList;
actor.representedWorkNodes.add(representedActor.computeNodeAddress);
}
}
Expand Down Expand Up @@ -300,7 +314,7 @@ export default class StreamPlanParser {
}

parseNode(actorId, nodeProto) {
let id = getNodeId(nodeProto);
let id = getNodeId(nodeProto, actorId);
if (this.parsedNodeMap.has(id)) {
return this.parsedNodeMap.get(id);
}
Expand All @@ -313,7 +327,7 @@ export default class StreamPlanParser {
}
}

if (newNode.type === "mergeNode" && newNode.typeInfo.upstreamActorId) {
if (newNode.type === "merge" && newNode.typeInfo.upstreamActorId) {
for (let upStreamActorId of newNode.typeInfo.upstreamActorId) {
if (!this.actorId2Proto.has(upStreamActorId)) {
continue;
Expand All @@ -322,7 +336,7 @@ export default class StreamPlanParser {
}
}

if (newNode.type === "chainNode" && newNode.typeInfo.upstreamActorIds) {
if (newNode.type === "chain" && newNode.typeInfo.upstreamActorIds) {
for (let upStreamActorId of newNode.typeInfo.upstreamActorIds) {
if (!this.actorId2Proto.has(upStreamActorId)) {
continue;
Expand All @@ -331,7 +345,7 @@ export default class StreamPlanParser {
}
}

if (newNode.type === "materializeNode") {
if (newNode.type === "materialize") {
this.actorIdTomviewNodes.set(actorId, newNode);
}

Expand Down
7 changes: 4 additions & 3 deletions dashboard/lib/streamPlan/streamChartHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,10 @@ export class StreamChartHelper {
}
for (let actor of fragmentRepresentedActors) {
for (let outputActorNode of actor.output) {
dagNodeMap.get(actor.actorId).nextNodes.push(
dagNodeMap.get(outputActorNode.actorId)
)
let outputDagNode = dagNodeMap.get(outputActorNode.actorId);
if(outputDagNode){ // the output actor node is in a represented actor
dagNodeMap.get(actor.actorId).nextNodes.push(outputDagNode)
}
}
}
let actorDagNodes = [];
Expand Down

0 comments on commit 3e8fe1e

Please sign in to comment.