Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for EDUs during fast joins #465

Merged
merged 21 commits into from
Oct 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 313 additions & 1 deletion tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func TestPartialStateJoin(t *testing.T) {

// we should be able to send events in the room, during the resync
t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) {
// See https://github.com/matrix-org/synapse/issues/12997
t.Skip("Cannot yet send events during resync")
alice := deployment.RegisterUser(t, "hs1", "t3alice", "secret", false)

Expand All @@ -241,6 +242,317 @@ func TestPartialStateJoin(t *testing.T) {
t.Logf("Alice sent event event ID %s", eventID)
})

// we should be able to receive typing EDU over federation during the resync
t.Run("CanReceiveTypingDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

// Derek starts typing in the room.
derekUserId := psjResult.Server.UserID("derek")
content, _ := json.Marshal(map[string]interface{}{
"room_id": serverRoom.RoomID,
"user_id": derekUserId,
"typing": true,
})
edu := gomatrixserverlib.EDU{
Type: "m.typing",
Content: content,
}
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// Alice should be able to see that Derek is typing (even though HS1 is resyncing).
aliceNextBatch := alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool {
if result.Get("type").Str != "m.typing" {
return false
}
user_ids := result.Get("content.user_ids").Array()
if len(user_ids) != 1 {
return false
}
return user_ids[0].Str == derekUserId
}),
)

// Alice should still be able to see incoming PDUs in the room during
// the resync; the earlier EDU shouldn't interfere with this.
// (See https://github.com/matrix-org/synapse/issues/13684)
event := psjResult.CreateMessageEvent(t, "charlie", nil)
serverRoom.AddEvent(event)
server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)
aliceNextBatch = awaitEventViaSync(t, alice, serverRoom.RoomID, event.EventID(), aliceNextBatch)

// The resync completes.
psjResult.FinishStateRequest()

// Derek stops typing.
content, _ = json.Marshal(map[string]interface{}{
"room_id": serverRoom.RoomID,
"user_id": derekUserId,
"typing": false,
})
edu = gomatrixserverlib.EDU{
Type: "m.typing",
Content: content,
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// Alice should be able to see that no-one is typing.
alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
Since: aliceNextBatch,
},
client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool {
return (result.Get("type").Str == "m.typing" &&
result.Get("content.user_ids.#").Int() == 0)
}),
)

})

// we should be able to receive presence EDU over federation during the resync
t.Run("CanReceivePresenceDuringPartialStateJoin", func(t *testing.T) {
// See https://github.com/matrix-org/synapse/issues/13008")
t.Skip("Presence EDUs are currently dropped during a resync")
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

derekUserId := psjResult.Server.UserID("derek")

content, _ := json.Marshal(map[string]interface{}{
"push": []map[string]interface{}{
map[string]interface{}{
"user_id": derekUserId,
"presence": "online",
"last_active_ago": 100,
},
},
})
edu := gomatrixserverlib.EDU{
Type: "m.presence",
Content: content,
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
func(userID string, sync gjson.Result) error {
for _, e := range sync.Get("presence").Get("events").Array() {
if e.Get("sender").Str == derekUserId {
return nil
}
}
return fmt.Errorf("No presence update from %s", derekUserId)
},
)

psjResult.FinishStateRequest()
})

// we should be able to receive to_device EDU over federation during the resync
t.Run("CanReceiveToDeviceDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

// Send a to-device message from Derek to Alice.
derekUserId := psjResult.Server.UserID("derek")
messageId := "hiezohf6Hoo7kaev"
content, _ := json.Marshal(map[string]interface{}{
"message_id": messageId,
"sender": derekUserId,
"type": "m.test",
"messages": map[string]interface{}{
alice.UserID: map[string]interface{}{
"*": map[string]interface{}{},
},
},
})
edu := gomatrixserverlib.EDU{
Type: "m.direct_to_device",
Content: content,
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// Alice should see Derek's to-device message when she syncs.
alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
func(userID string, sync gjson.Result) error {
for _, e := range sync.Get("to_device.events").Array() {
if e.Get("sender").Str == derekUserId &&
e.Get("type").Str == "m.test" {
return nil
}
}
return fmt.Errorf("No to_device update from %s", derekUserId)
},
)
psjResult.FinishStateRequest()
})

// we should be able to receive receipt EDU over federation during the resync
t.Run("CanReceiveReceiptDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

derekUserId := psjResult.Server.UserID("derek")

// Derek sends a read receipt into the room.
content, _ := json.Marshal(map[string]interface{}{
serverRoom.RoomID: map[string]interface{}{
"m.read": map[string]interface{}{
derekUserId: map[string]interface{}{
"data": map[string]interface{}{
"ts": 1436451550453,
},
"event_ids": []string{"mytesteventid"},
},
},
},
})
edu := gomatrixserverlib.EDU{
Type: "m.receipt",
Content: content,
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// Alice should be able to see Derek's read receipt during the resync
alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool {
if result.Get("type").Str != "m.receipt" {
return false
}

if result.Get("content").Get("mytesteventid").Get("m\\.read").Get(strings.Replace(derekUserId, ".", "\\.", -1)).Get("ts").Int() == 1436451550453 {
return true
}
return false
}),
)
psjResult.FinishStateRequest()
})

// we should be able to receive device list update EDU over federation during the resync
t.Run("CanReceiveDeviceListUpdateDuringPartialStateJoin", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit dubious about this test TBH, I don't think it tests anything particularly useful as is and may just cause confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow this either. Shouldn't the device list update be send to Alice, even if she and Derek aren't in an encrypted room?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in: Alice should get told via /sync that Derek's device list has changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Synapse currently, yes, though its a bit of a grey area and we've certainly talked about only sending device list updates for E2EE enabled devices and/or users who share rooms.

OTOH, I think all the current integration tests assume this, so I don't see a problem assuming it here

deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

derekUserId := psjResult.Server.UserID("derek")

content, _ := json.Marshal(map[string]interface{}{
"device_id": "QBUAZIFURK",
"stream_id": 1,
"user_id": derekUserId,
})
edu := gomatrixserverlib.EDU{
Type: "m.device_list_update",
Content: content,
}
aliceNextBatch := getSyncToken(t, alice)
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// The resync completes.
psjResult.FinishStateRequest()

// Check that Alice is told that Derek's devices have changed.
// (Alice does not get told this during the resync, since we can't know
// for certain who is in that room until the resync completes.)
aliceNextBatch = alice.MustSyncUntil(
t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
Since: aliceNextBatch,
},
func(clientUserID string, res gjson.Result) error {
matcher := match.JSONCheckOff(
"device_lists.changed",
[]interface{}{derekUserId},
func(r gjson.Result) interface{} { return r.Str },
nil,
)
return matcher([]byte(res.Raw))
},
)
})

// we should be able to receive signing key update EDU over federation during the resync
t.Run("CanReceiveSigningKeyUpdateDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy()

derekUserId := psjResult.Server.UserID("derek")

content, _ := json.Marshal(map[string]interface{}{
"user_id": derekUserId,
})
edu := gomatrixserverlib.EDU{
Type: "m.signing_key_update",
Content: content,
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu})

// If we want to check the sync we need to have an encrypted room,
// for now just check that the fed transaction is accepted.
})

// we should be able to receive events over federation during the resync
t.Run("CanReceiveEventsDuringPartialStateJoin", func(t *testing.T) {
alice := deployment.RegisterUser(t, "hs1", "t4alice", "secret", false)
Expand Down Expand Up @@ -2482,7 +2794,7 @@ func testReceiveEventDuringPartialStateJoin(
"GET",
[]string{"_matrix", "client", "v3", "rooms", psjResult.ServerRoom.RoomID, "state", "m.room.member", "@non-existent:remote"},
)

// check the server's idea of the state at the event. We do this by making a `state_ids` request over federation
stateReq = gomatrixserverlib.NewFederationRequest("GET", "hs1",
fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s",
Expand Down