Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot read a topic (topic written using mop) continuously. #1501

Open
vishnu-illikkal opened this issue Oct 15, 2024 · 0 comments
Open

cannot read a topic (topic written using mop) continuously. #1501

vishnu-illikkal opened this issue Oct 15, 2024 · 0 comments

Comments

@vishnu-illikkal
Copy link

vishnu-illikkal commented Oct 15, 2024

Describe the bug
When i created a pulsar topic using mop and when trying to consume that topic it consumes only till the last stored data and not the new data's published to the topic.

To Reproduce
Steps to reproduce the behavior:

  1. setup
  • Below is the pulsar.service file for Raspberry Pi 4
[Unit]
Description=Apache Pulsar Standalone
After=network.target

[Service]
Type=simple
User=vishnuip
ExecStart=/home/vishnuip/apache-pulsar-3.0.7/bin/pulsar standalone
Restart=on-failure
WorkingDirectory=/home/vishnuip/apache-pulsar-3.0.7

[Install]
WantedBy=multi-user.target
  • Below is the pulsar.service file for Ubuntu
[Unit]
Description=Pulsar Broker
After=network.target

[Service]
Type=simple
ExecStart=/home/ubuntu/apache-pulsar-3.0.7/bin/pulsar standalone
WorkingDirectory=/home/ubuntu/apache-pulsar-3.0.7
RestartSec=20s
Restart=on-failure


[Install]
WantedBy=multi-user.target
git clone https://github.com/streamnative/mop.git
cd mop
mvn clean install -DskipTests
mkdir /home/ubuntu/apache-pulsar-3.0.7/protocols
cp /home/ubuntu/mop/mqtt-impl/target/pulsar-protocol-handler-mqtt-3.4.0-SNAPSHOT.nar /home/ubuntu/apache-pulsar-3.0.7/protocols/
nano /home/ubuntu/apache-pulsar-3.0.7/conf/standalone.conf
  1. Added this to the standalone.conf file
messagingProtocols=mqtt
protocolHandlerDirectory=/home/ubuntu/apache-pulsar-3.0.7/protocols
mqttListeners=mqtt://127.0.0.1:1883
advertisedAddress=127.0.0.1
  1. restart pulsar
sudo systemctl restart pulsar
sudo systemctl status pulsar
sudo ss -tulnp | grep 1883
  • when the port is opened it will show something like
tcp   LISTEN 0      4096         0.0.0.0:1883       0.0.0.0:*    users:(("java",pid=6044,fd=471))   
  1. now a python script is used to send messages to the mqtt broker
import random
import time
import json
import paho.mqtt.client as mqtt


def send_data_to_mqtt(broker: str, port: int, topic: str,
                      message: str) -> None:
    """
    Sends a message to a specified MQTT topic.

    Parameters:
        broker (str): The MQTT broker address.
        port (int): The port number for the MQTT broker.
        topic (str): The MQTT topic to which the message will be sent.
        message (str): The message to be sent to the MQTT topic.

    Returns:
        None
    """
    # Create an MQTT client instance
    client = mqtt.Client()

    # Connect to the MQTT broker
    client.connect(broker, port)

    # Publish the message to the specified topic
    client.publish(topic, message)

    # Disconnect from the broker
    client.disconnect()


# Example usage
if __name__ == "__main__":
    broker_address = "192.168.1.21"
    broker_port = 1883  # Default MQTT port
    # mqtt_topic = "persistent://public/default/vishnu-mqtt-topic"
    mqtt_topic = "vishnu-mqtt-topic"

    while True:
        data = {
            "key": random.randint(0, 100)
        }
        data_to_send = json.dumps(data)
        # data_to_send = data_to_send + str(random.randint(0, 100))
        send_data_to_mqtt(broker_address, broker_port, mqtt_topic, data_to_send)
        print(json.dumps(data))
        time.sleep(1)
  • This sends data continuously without any problem
  1. Now i created a consumer using py
import pulsar
import time

from pulsar import Consumer


def create_consumer(client: pulsar.Client, topic: str, subscription_name: str) -> pulsar.Consumer:
    """Creates a Pulsar consumer and returns it."""
    return client.subscribe(
        topic=topic,
        subscription_name=subscription_name,
        consumer_type=pulsar.ConsumerType.Shared,  # Exclusive for testing
        initial_position=pulsar.InitialPosition.Earliest,
    )


def reconnect(client: pulsar.Client, topic: str, subscription_name: str) -> Consumer | None:
    """Attempts to reconnect and recreate the consumer."""
    try:
        print("Attempting to reconnect...")
        time.sleep(5)  # Avoid spamming reconnection attempts
        return create_consumer(client, topic, subscription_name)
    except Exception as e:
        print(f"Reconnection failed: {e}")
        return None


def consume_messages(consumer: pulsar.Consumer, client: pulsar.Client, topic: str, subscription_name: str):
    """Handles the consumption of messages, including connection handling and reconnection logic."""
    while True:
        msg = None
        try:
            # Check the connection and receive a message with timeout
            if consumer.is_connected():
                msg = consumer.receive(timeout_millis=5000)  # Timeout of 5 seconds
                # consumer.close()
                # print(msg.data().decode('utf-8'))
            else:
                consumer = reconnect(client, topic, subscription_name)
                print("Connection lost")

            if msg:
                # Decode and process the message
                message_data = msg.data().decode('utf-8')
                message_id = msg.message_id()

                print(f"\n\nReceived message: '{message_data}' \nid: '{message_id}'")

                # Acknowledge successful processing
                consumer.acknowledge(msg)
            else:
                print("No new messages within the timeout period. Waiting...")

        except pulsar.Timeout:
            # Handle timeout and continue listening
            print("Timeout occurred. No new messages available. Checking again...")
            # print(msg.data().decode('utf-8'))

        except pulsar.ConnectError as e:
            # Handle connection error and attempt to reconnect
            print(f"Connection error: {e}")
            consumer = reconnect(client, topic, subscription_name)
            if consumer is None:
                # Exit if reconnection fails
                break

        except Exception as e:
            print(f"Error processing message: {e}")
            if msg:
                consumer.negative_acknowledge(msg)

        except KeyboardInterrupt:
            print("KeyboardInterrupt detected. Exiting...")


def main():
    # Initialize Pulsar client
    client = pulsar.Client('pulsar://192.168.1.21:6650')

    # Define topic and subscription
    topic = "persistent://public/default/vishnu-mqtt-topic"
    subscription_name = 'pulsar-mqtt123-testing-subscription'

    # Create initial consumer
    consumer = create_consumer(client, topic, subscription_name)

    # Consume messages
    consume_messages(consumer, client, topic, subscription_name)

    # Close the client connection upon exiting
    print("Closing client connection...")
    client.close()


if __name__ == "__main__":
    main()
  1. Now this consumer reads data till whatever is available and not new data
  • i have attached a screenshot of the responses from the 2 py scripts when running
  • We can see that the {"key": 11} is the last one received, but there is still new value's getting added to the topic.

Expected behavior
Should be able to consume messages continuously from the topic created using mop.

Screenshots
image

Desktop (please complete the following information):

  • Raspberry Pi 4
Distributor ID: Debian
Description:    Debian GNU/Linux 12 (bookworm)
Release:        12
Codename:       bookworm
  • Ubuntu
Distributor ID: Ubuntu
Description:    Ubuntu 22.04.4 LTS
Release:        22.04
Codename:       jammy
  • Python scripts are ran in the windows 11 pc

Additional context

  • Only has the problem with the mop topics, other topics have no problem.
  • I have used the above 2 systems Raspberry Pi 4 and Ubuntu to test this problem, but both had same problem.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant