Skip to content

Commit

Permalink
Synchronization fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ekuiter committed Jun 8, 2019
1 parent 65829f3 commit 0493ad2
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 27 deletions.
2 changes: 1 addition & 1 deletion client/src/store/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ function serverReceiveReducer(state: State, action: Action): State {
enqueueMessage({type: MessageType.KERNEL, message: heartbeat}, artifactPath);
deferred(flushMessageQueue)();
}
return {...collaborativeSession, kernelContext, kernelCombinedEffect}; // TODO: do not change on heartbeats
return {...collaborativeSession, kernelContext, kernelCombinedEffect};
}));
if (isEditingFeatureModel(state))
state = updateFeatureModel(state, action.payload.artifactPath!);
Expand Down
26 changes: 13 additions & 13 deletions kernel/src/kernel/core/conflict_resolution.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@
site need not already have seen the same conflicts as the checked site)."
[GC site-ID HB CC]
(p ::conflict-aware?
(let [res (some #(let [CO-ID-a (first %)
CO-ID-b (first (disj % CO-ID-a))]
(and (VC/_< ((HB/lookup HB CO-ID-a) :VC) (GC/get-site-VC GC site-ID))
(VC/_< ((HB/lookup HB CO-ID-b) :VC) (GC/get-site-VC GC site-ID))))
(CC/get-all-conflicts CC))]
(log "conflict-aware?" site-ID res)
res)))
(boolean
(some #(let [CO-ID-a (first %)
CO-ID-b (first (disj % CO-ID-a))]
(and (VC/_< ((HB/lookup HB CO-ID-a) :VC) (GC/get-site-VC GC site-ID))
(VC/_< ((HB/lookup HB CO-ID-b) :VC) (GC/get-site-VC GC site-ID))))
(CC/get-all-conflicts CC)))))

(defn synchronized?
"Determines whether all sites are aware of a conflict, i.e., are in conflict resolution mode.
Until synchronization, no resolution and voting is allowed.
When all sites are conflict-aware (i.e., frozen), no more operations can be in flight - because
if there were some, they would have been generated in a frozen state, which is not allowed."
[GC HB CC]
[GC HB CC own-site-ID]
(p ::synchronized?
(let [res (every? #(conflict-aware? GC % HB CC) (GC/get-site-IDs GC))]
(log "synchronized?" res)
res)))
(let [other-client-site-IDS (GC/get-other-client-site-IDs GC own-site-ID)
synchronized? (every? #(conflict-aware? GC % HB CC) other-client-site-IDS)]
(log (count other-client-site-IDS) "sites are" (if synchronized? "synchronized" "not synchronized"))
synchronized?)))

(defn conflict-descriptor
"A conflict descriptor contains information about a conflict situation.
Expand All @@ -47,7 +47,7 @@
in turn map from operations to another map, which maps from operations to conflicts.
The :metadata key maps from operation IDs to metadata, e.g., human-readable descriptions.
The :synchronized key maps to a boolean which indicates whether all sites are synchronized yet."
[MCGS CDAG HB CC GC]
[MCGS CDAG HB CC GC own-site-ID]
(p ::conflict-descriptor
(let [versions (reduce (fn [acc MCG]
(assoc acc (MOVIC/MCG-identifier MCG)
Expand All @@ -70,4 +70,4 @@
{:versions versions
:conflicts conflicts
:metadata metadata
:synchronized (synchronized? GC HB CC)})))
:synchronized (synchronized? GC HB CC own-site-ID)})))
11 changes: 11 additions & 0 deletions kernel/src/kernel/core/garbage_collector.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@
[GC]
(keys GC))

(defn get-other-client-site-IDs
"Returns all sites known to the local site but the server and local site.
This is done for synchronization, where the server need not be considered as it does not
participate in conflict resolution. Also, the server is not per se obliged to send heartbeats
as clients when they become conflict-aware.
The local site is, when synchronizing, already conflict-aware by definition."
[GC own-site-ID]
(->> (get-site-IDs GC)
(remove #(= % :server))
(remove #(= % own-site-ID))))

(defn get-site-VC
"Returns the garbage collector's most recently received vector clock for a given site.
This is useful to determine whether a site has succeeded some other vector clock."
Expand Down
4 changes: 1 addition & 3 deletions kernel/src/kernel/shell/client.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@
(swap! (*context* :MCGS) #(MOVIC/MOVIC % CO @(*context* :CDAG) @(*context* :HB) @(*context* :base-FM) (*context* :CC)))
; not required, just to be clear that this information is only needed by the MOVIC call
(swap! (*context* :CC) #(CC/with-most-recent % nil))
(let [next-FM (site/next-FM @(*context* :MCGS) @(*context* :CDAG) @(*context* :HB) @(*context* :CC) @(*context* :base-FM) @(*context* :GC))]
(reset! (*context* :FM) next-FM)
[next-FM CO]))))
[(site/next-FM!) CO])))

(defn generate-inverse-operation!
"**TODO**: Generates an inverse operation concurrent to all following operations.
Expand Down
28 changes: 18 additions & 10 deletions kernel/src/kernel/shell/site.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@
"Based on the result the MOVIC algorithm returned, checks whether
one CG was produced, in that case applying and returning the correct
feature model. Otherwise, returns a conflict descriptor."
[MCGS CDAG HB CC base-FM GC]
(if (= (count MCGS) 1)
(log "no conflict occured, producing feature model")
[MCGS CDAG HB CC base-FM GC own-site-ID]
(case (count MCGS)
0 (log "no operations submitted yet, producing feature model")
1 (log "no conflict occured, producing feature model")
(log "conflicts occured," (count MCGS) "maximum compatible groups created"))
(if (= (count MCGS) 1)
(topological-sort/apply-compatible* CDAG HB base-FM (first MCGS))
(conflict-resolution/conflict-descriptor MCGS CDAG HB CC GC)))
(case (count MCGS)
0 base-FM
1 (topological-sort/apply-compatible* CDAG HB base-FM (first MCGS))
(conflict-resolution/conflict-descriptor MCGS CDAG HB CC GC own-site-ID)))

(defn next-FM!
"Calculates the next feature model or conflict descriptor, stores it
in the context and returns it."
[]
(let [next-FM (next-FM @(*context* :MCGS) @(*context* :CDAG) @(*context* :HB) @(*context* :CC) @(*context* :base-FM) @(*context* :GC) (*context* :site-ID))]
(reset! (*context* :FM) next-FM)
next-FM))

(defn receive-operation!
"Receives an operation message at a site.
Expand All @@ -64,9 +74,7 @@
(swap! (*context* :CC) #(CC/with-most-recent % (CO/get-ID CO)))
(swap! (*context* :MCGS) #(MOVIC/MOVIC % CO @(*context* :CDAG) @(*context* :HB) @(*context* :base-FM) (*context* :CC)))
(swap! (*context* :CC) #(CC/with-most-recent % nil)) ; not required, just to be clear
(let [next-FM (next-FM @(*context* :MCGS) @(*context* :CDAG) @(*context* :HB) @(*context* :CC) @(*context* :base-FM) @(*context* :GC))]
(reset! (*context* :FM) next-FM)
next-FM))
(next-FM!))

(defn receive-heartbeat!
"Receives a heartbeat message at a site.
Expand All @@ -75,7 +83,7 @@
(log "receiving heartbeat message from" (message/get-site-ID message))
(swap! (*context* :VC) #(VC/_merge (VC/increment % (*context* :site-ID)) (message/get-VC message)))
(swap! (*context* :GC) #(GC/insert % (message/get-site-ID message) (message/get-VC message)))
@(*context* :FM))
(next-FM!))

(defn receive-leave!
"Receives a leave message at a site.
Expand Down

0 comments on commit 0493ad2

Please sign in to comment.