Skip to content

Commit

Permalink
FIX #4117 (#4124)
Browse files Browse the repository at this point in the history
* test: active$ should emit during replication

* FIX `active$` should emit during replication #4117

Co-authored-by: Max Nowack <[email protected]>
  • Loading branch information
pubkey and maxnowack authored Nov 10, 2022
1 parent be3fe7e commit 12cee0a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

- ADD [replication-couchdb-new plugin](./docs-src/replication-couchdb-new.md) which can be used to replicate **any** [RxStorage](https://rxdb.info/rx-storage.html) with a CouchDB endpoint.
- ADD skip replication `retryTime` if `navigator.onLine` becomes `true`.
- FIX `active$` should emit during replication [#4117](https://github.com/pubkey/rxdb/pull/4117) Thanks [@maxnowack](https://github.com/maxnowack)
<!-- ADD new changes here! -->

<!-- /CHANGELOG NEWEST -->
Expand Down
19 changes: 12 additions & 7 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import {
BehaviorSubject,
combineLatest,
mergeMap,
Observable,
Subject,
Expand Down Expand Up @@ -296,18 +297,22 @@ export class RxReplicationState<RxDocType, CheckpointType> {
this.subs.push(
this.internalReplicationState.events.error.subscribe(err => {
this.subjects.error.next(err);
})
);
this.subs.push(
}),
this.internalReplicationState.events.processed.down
.subscribe(row => this.subjects.received.next(row.document as any))
);
this.subs.push(
.subscribe(row => this.subjects.received.next(row.document as any)),
this.internalReplicationState.events.processed.up
.subscribe(writeToMasterRow => {
this.subjects.send.next(writeToMasterRow.newDocumentState);
})
}),
combineLatest([
this.internalReplicationState.events.active.down,
this.internalReplicationState.events.active.up
]).subscribe(([down, up]) => {
const isActive = down || up;
this.subjects.active.next(isActive);
})
);

if (
this.pull &&
this.pull.stream$ &&
Expand Down
33 changes: 33 additions & 0 deletions test/unit/replication.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,39 @@ describe('replication.test.js', () => {
10
);

localCollection.database.destroy();
remoteCollection.database.destroy();
});
it('should emit active$ when a replication cycle is running', async () => {
const { localCollection, remoteCollection } = await getTestCollections({ local: 0, remote: 0 });
const replicationState = replicateRxCollection({
collection: localCollection,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
live: true,
pull: {
handler: getPullHandler(remoteCollection)
},
push: {
handler: getPushHandler(remoteCollection)
}
});
replicationState.error$.subscribe(err => {
console.log('got error :');
console.dir(err);
throw err;
});

let wasActive = false;
replicationState.active$.subscribe((active) => {
if (active) wasActive = active
});

await replicationState.awaitInitialReplication();
assert.strictEqual(
wasActive,
true
);

localCollection.database.destroy();
remoteCollection.database.destroy();
});
Expand Down

0 comments on commit 12cee0a

Please sign in to comment.