Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Only send message to each client once #63

Merged
merged 6 commits into from
Oct 22, 2013
Merged

Only send message to each client once #63

merged 6 commits into from
Oct 22, 2013

Conversation

davedoesdev
Copy link
Contributor

I verified it fixes MQTTAscoltatori too - will send that pull request once this one is merged.

Closes #62.

setTimeout(function ()
{
client1.disconnect();
}, 500);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setTimeout is bad and should be avoided in tests.
Do this instead:

  1. use QoS 1
  2. count the numer of "publish" events.
  3. call disconnect() on "puback", after verifying the number of events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be one publish event. How do you know unexpected duplicates won't be received if you disconnect straight away?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use QoS 1, PUBLISH packets will be sent to all subscribers before sending out a PUBACK to the sender. So, if QoS 1 is implemented correctly, all the duplicates will arrive before the PUBACK. If not, we have one more bug.

@mcollina
Copy link
Collaborator

the mosca.trail[uid] is never deleted. It is a damn memory leak that we cannot afford.

Using a per-server message counter might be easier and safe. Example algorithm:

  1. every message is added a _dedupId, generated by incrementing the counter.
  2. every client memorize the last _dedupId transmitted.

Ideally the counter should be cyclic at Math.pow(2, 32), but I think a clever solution can be found later.

@davedoesdev
Copy link
Contributor Author

mosca.trail[uid] will be deleted when the message is deleted. Does mosca keep the message in memory forever?

In the _dedupId scheme, what if message delivery is intermingled? Doesn't that constrain mosca so that it will always have to send one message before it sends another?

@mcollina
Copy link
Collaborator

mosca.trail[uid] will be deleted when the message is deleted. Does mosca keep the message in memory forever?

Nope, you are right, sorry. I thought that would be global, as the options are not passed through all the ascoltatori backends. If it's bound to the message, then multiple Mosca sharing the same 'parent' broker will have the same problem. options can be undefined.

In the _dedupId scheme, what if message delivery is intermingled? Doesn't that constrain mosca so that it will always have to send one message before it sends another?

Nope, the counter is incremented only when a message enter the current Mosca instance (in the first forward callback, you can add the _dedupId to the topic itself). From that, all is in-order, e.g. calling emit('aaa', 1); emit('aaa', 2) will emit 1 and then 2.

@davedoesdev
Copy link
Contributor Author

Can't modify topic - can't set property on primitives:

> wup = "foo"
'foo'
> wup._foo = 90
90
> wup
'foo'
> wup._foo
undefined

@davedoesdev
Copy link
Contributor Author

The other thing I thought is it means each ascoltatore has to call all callbacks registered for a topic before it handles any other messages.

@davedoesdev davedoesdev reopened this Oct 19, 2013
@mcollina
Copy link
Collaborator

Ok, then we need to ensure that in Ascoltatori the options object is always present, even if it's empty, and add the _dedupId there.
Do you think it's better to do that in Ascoltatori or in Mosca?
If we do it here, we might even create an AddDedupIdAscoltatore that wraps the internal ascoltatore.

@davedoesdev
Copy link
Contributor Author

AddDedupIdAscoltatore is a good idea.

@mcollina
Copy link
Collaborator

Would you like to add that to the PR?

@davedoesdev
Copy link
Contributor Author

Sure. Mosca or Ascoltatori do you think?

@davedoesdev
Copy link
Contributor Author

Think Ascoltatori is tidiest. Adding AddDedupIdAscoltatore as a DecoratorAscoltatore - might be generally useful to projects other than Mosca for recognising duplicates.

@mcollina
Copy link
Collaborator

Yes. Let ensure there is always an 'option' obj for each message delivered.

Il giorno sabato 19 ottobre 2013, David Halls ha scritto:

Think Ascoltatori is tidiest.


Reply to this email directly or view it on GitHubhttps://github.com//pull/63#issuecomment-26656564
.

@davedoesdev
Copy link
Contributor Author

Actually, AddDedupIdAscoltatore isn't right because messages are published from inside ascoltatori themselves, onto TrieAscoltatore. However, we can modify _setPublish to it fills in options as {} if not passed (instead of undefined).

@davedoesdev
Copy link
Contributor Author

So that works but what if the same options object is used for every message? Do we really want to be making a shallow copy (or using Object.create) for every publish or can we assume fresh options for each publish?

…ing a

message to prevent same message being sent twice to a connection.
Assumes:
- options is always an object and never null
- fresh options object is minted for each message
@davedoesdev
Copy link
Contributor Author

OK so I pushed what I have. I tested ascoltatori against it.

@davedoesdev
Copy link
Contributor Author

I still need to fix the test

@davedoesdev
Copy link
Contributor Author

Weird - travis build worked on 0.10 but not on 0.8

@davedoesdev
Copy link
Contributor Author

Hmm, repro'd failure on 0.8. Strange...
client1 doesn't get the publish

@mcollina
Copy link
Collaborator

very weird. I don't exactly now what's going on.. only that supporting 0.8 is really hard.

@davedoesdev
Copy link
Contributor Author

The MQTTConnection gets and emits publish but it doesn't reach the test app. The publish is also received before the other connection gets and emits puback, which is as expected.

@davedoesdev
Copy link
Contributor Author

Interesting- added a log to readable-stream.Writable.emit (intercepted it) and for that test, it thinks the registered function for publish is:

function (packet) {
    that.setUpTimer();
    packet.topic = rewriteTopic(packet.topic);
    that.server.authorizePublish(that, packet.topic, packet.payload, function(err, success) {
      that.handleAuthorizePublish(err, success, packet);
    });
  }

@davedoesdev
Copy link
Contributor Author

I don't really understand that - from the code, it looks like the tests are using a MqttConnection and not a Client.
Clearly Node 0.10 gets that right but for some reason 0.8 doesn't.
I think that might be the server connection which is displaying, in which case it's not getting to the client connection.

@davedoesdev
Copy link
Contributor Author

I think the problem is on node 0.8, mqtt uses readable-stream which is flushing (or rather, not) differently. We don't wait for the stream to flush (neither does mqtt). I don't think we need to alter the code of mosca (or mqtt) since we don't actually care that it's gone. However, the test gets puback straight away and so then disconnects. (The data hasn't yet been flushed).

@davedoesdev
Copy link
Contributor Author

A setTimeout in the test fixes it :)
i.e. wait for the data to be flushed. I can't find a flush method in readable-stream - I don't think we expose the server connection anyway.

mqtt uses readable-stream which doesn't send data straight away.
So wait before disconnecting so the data gets chance to go.
@davedoesdev
Copy link
Contributor Author

OK, build now passes.

@@ -168,6 +169,13 @@ Client.prototype.actualSend = function(packet, retry) {
Client.prototype._buildForward = function() {
var that = this;
this.forward = function(topic, payload, options, subTopic, initialQoS, cb) {
if (options._dedupId === undefined) {
options._dedupId = that.server.nextDedupId();
} else if (options._dedupId === that._lastDedupId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a <=, not a ===.

@davedoesdev
Copy link
Contributor Author

@mcollina please review. I couldn't really think of an alternative to setTimeout for the test in order to let the data flush.
Also, the dedupId solution does assume that ascolatori don't defer calls to any of the callbacks on publish (if they do, the messages and dedupIds will become intermingled.

@davedoesdev
Copy link
Contributor Author

Ah just missed your review, fixing...

@mcollina
Copy link
Collaborator

It's a right assumption. The callback is not deferred, otherwise the performance will just grow too much.

@davedoesdev regarding the puback, I got the problem. How about using two clients? One for publishing and one for subscribing? That should remove the setTimeout.

@davedoesdev
Copy link
Contributor Author

:( It does use two clients. Client 1 subscribes, client 2 publishes. The problem is that puback is returned after sending data to client 1's stream. But readable-stream doesn't seem to flush the data right away.

      var called = 0;

      client1.on("publish", function(packet) {
        expect(packet.topic).to.equal("a/b");
        expect(packet.payload).to.equal("some other data");
        if (!packet.dup) {
          expect(called).to.equal(0);
          called++;
        }
      });

      client1.on("suback", function() {
        buildAndConnect(d, function(client2) {

          client2.on("puback", function() {
            setTimeout(function () {
              expect(called).to.equal(1);
              client1.disconnect();
              client2.disconnect();
            }, 1000);
          });

          client2.publish({
            topic: "a/b",
            payload: "some other data",
            messageId: messageId,
            qos: 1
          });
        });
      });

      client1.subscribe({
        subscriptions: subscriptions,
        messageId: messageId
      });

@mcollina
Copy link
Collaborator

I'm taking it from now and see if I can remove the damn setTimeout.

@davedoesdev
Copy link
Contributor Author

Thanks. I ended up in the readable-stream module source.

@davedoesdev
Copy link
Contributor Author

I guess one way would be to get client1 to send a message back once it's received it. However, this wouldn't give time for any duplicates to be registered.
I pressed the close button by mistake again.
Anyway, over to you...

@davedoesdev davedoesdev reopened this Oct 22, 2013
@mcollina mcollina merged commit b59252d into moscajs:master Oct 22, 2013
@mcollina
Copy link
Collaborator

Ok, merged, it's fixed now, without a setTimeout. The trick was not checking called before disconnecting.
The disconnect method in MQTT.js sends a MQTT disconnect packet, it does not close the connection straight away, but let the server handle it (so it can receive in-flight packets).

@mcollina
Copy link
Collaborator

Thank again! This was very important (and long to debug).

@davedoesdev
Copy link
Contributor Author

Yes, that works.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Overlapped topics
2 participants