Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added method to check subscribers #171

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# frozen_string_literal: true

source 'https://rubygems.org'

# Specify your gem's dependencies in stomp.gemspec
gemspec
14 changes: 14 additions & 0 deletions lib/stomp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,20 @@ def closed?()
@connection.closed?()
end

## Checks if subscriber with destination is passed is present or not
# @returns [Boolean]
#
# example
# Stomp::Client.new().subscriber?('/topic/topicName') => false
# Stomp::Client.new().subscriber?('/queue/queueName') => true
# Stomp::Client.new().subscriber?('/queue/Consumer.subscriber1.VirtualTopic.topicName') => true
#
def subscribed?(destination, headers = {})
headers = headers.symbolize_keys
headers = headers.merge(:id => build_subscription_id(destination, headers))
!@listeners[headers[:id]].nil?
end

# jruby? tests if the connection has detcted a JRuby environment
def jruby?()
@connection.jruby
Expand Down
7 changes: 7 additions & 0 deletions lib/stomp/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ def subscribe(destination, headers = {}, subId = nil)
transmit(Stomp::CMD_SUBSCRIBE, headers)
end

def subscribed?(destination, subId = nil)
return false unless @reliable

subId = destination if subId.nil?
!@subscriptions[subId].nil?
end

# Unsubscribe from a destination. A subscription name is required.
# For Stomp 1.1+ a session unique subscription ID is also required.
def unsubscribe(destination, headers = {}, subId = nil)
Expand Down
63 changes: 41 additions & 22 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
require 'spec_helper'
require 'client_shared_examples'


describe Stomp::Client do
let(:null_logger) { double("mock Stomp::NullLogger") }

Expand Down Expand Up @@ -192,7 +191,7 @@
it_should_behave_like "standard Client"

end

describe "(created with authenticating stomp:// URL and non-TLD host)" do

before(:each) do
Expand Down Expand Up @@ -283,7 +282,7 @@
it_should_behave_like "standard Client"

end

describe "(created with failover URL)" do
before(:each) do
@client = Stomp::Client.new('failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)')
Expand Down Expand Up @@ -312,63 +311,63 @@
client = Stomp::Client.new(url)
expect(client.parameters).to eq(@parameters)
end

it "should properly parse a URL with failover:" do
url = "failover:(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost1:61617,stomp://login3:passcode3@remotehost2:61618)"

@parameters[:hosts] = [
{:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
{:login => "login2", :passcode => "passcode2", :host => "remotehost1", :port => 61617, :ssl => false},
{:login => "login3", :passcode => "passcode3", :host => "remotehost2", :port => 61618, :ssl => false}
]

@parameters.merge!({:logger => null_logger})
@parameters[:client_main] = @cli_thread
expect(Stomp::Connection).to receive(:new).with(@parameters)
client = Stomp::Client.new(url)
expect(client.parameters).to eq(@parameters)
end

it "should properly parse a URL without user and password" do
url = "failover:(stomp://localhost:61616,stomp://remotehost:61617)"

@parameters[:hosts] = [
{:login => "", :passcode => "", :host => "localhost", :port => 61616, :ssl => false},
{:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false}
]

@parameters.merge!({:logger => null_logger})
@parameters[:client_main] = @cli_thread
expect(Stomp::Connection).to receive(:new).with(@parameters)

client = Stomp::Client.new(url)
@parameters[:client_main] = client.parameters[:client_main]
expect(client.parameters).to eq(@parameters)
end

it "should properly parse a URL with user and/or password blank" do
url = "failover:(stomp://@localhost:61616,stomp://@remotehost:61617)"

@parameters[:hosts] = [
{:login => "", :passcode => "", :host => "localhost", :port => 61616, :ssl => false},
{:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false}
]

@parameters.merge!({:logger => null_logger})
@parameters[:client_main] = @cli_thread
expect(Stomp::Connection).to receive(:new).with(@parameters)

client = Stomp::Client.new(url)
@parameters[:client_main] = client.parameters[:client_main]
expect(client.parameters).to eq(@parameters)
end

it "should properly parse a URL with the options query" do
query = "initialReconnectDelay=5000&maxReconnectDelay=60000&useExponentialBackOff=false&backOffMultiplier=3"
query += "&maxReconnectAttempts=4&randomize=true&backup=true&timeout=10000"

url = "failover:(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?#{query}"

#
@parameters = {
:initial_reconnect_delay => 5.0,
Expand All @@ -380,21 +379,21 @@
:connect_timeout => 0,
:reliable => true
}

@parameters[:hosts] = [
{:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
{:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
]

@parameters.merge!({:logger => null_logger})
@parameters[:client_main] = @cli_thread
expect(Stomp::Connection).to receive(:new).with(@parameters)

client = Stomp::Client.new(url)
@parameters[:client_main] = client.parameters[:client_main]
expect(client.parameters).to eq(@parameters)
end

end


Expand All @@ -407,7 +406,7 @@
message.command = Stomp::CMD_ERROR
message
end

it 'should handle ProducerFlowControlException errors by raising' do
expect do
@client = Stomp::Client.new
Expand Down Expand Up @@ -472,6 +471,27 @@ def original_headers
it_behaves_like 'argument-safe method'
end

describe '#subscribed?' do
subject { @client.subscribed?('/topic/topicName') }

context 'When subscription is present' do
before {
allow(@mock_connection).to receive(:subscribe)
@client.subscribe('/topic/topicName') { |message| }
}

it 'returns true' do
expect(subject).to be true
end
end

context 'When subscription is not present' do
it 'returns false' do
expect(subject).to be false
end
end
end

describe '#unsubscribe' do
let(:connection_headers) { original_headers.merge({:id => Digest::SHA1.hexdigest('destination')}) }
before {
Expand Down Expand Up @@ -563,6 +583,5 @@ def original_headers
it_behaves_like 'argument-safe method'
end
end

end
end