Skip to content
lucko edited this page Mar 29, 2021 · 6 revisions

helper provides a Messenger abstraction utility, which consists of a few key classes.

  • Messenger - an object which manages messaging Channels
  • Channel - represents an individual messaging channel. Facilitates sending a message to the channel, or creating a ChannelAgent
  • ChannelAgent - an agent for interacting with channel messaging streams. Allows you to add/remove ChannelListeners to a channel
  • ChannelListener - an object listening to messages sent on a given channel

The system is very easy to use, and cuts out a lot of the boilerplate code which usually goes along with using PubSub systems.

As an example, here is a super simple global player messaging system.

// get the Messenger
Messenger messenger = getService(Messenger.class);

// Define the channel data model.
class PlayerMessage {
    UUID uuid;
    String username;
    String message;

    public PlayerMessage(UUID uuid, String username, String message) {
        this.uuid = uuid;
        this.username = username;
        this.message = message;
    }
}

// Get the channel
Channel<PlayerMessage> channel = messenger.getChannel("pms", PlayerMessage.class);

// Listen for chat events, and send a message to our channel.
Events.subscribe(AsyncPlayerChatEvent.class, EventPriority.HIGHEST)
        .filter(Events.DEFAULT_FILTERS.ignoreCancelled())
        .handler(e -> {
            e.setCancelled(true);
            channel.sendMessage(new PlayerMessage(e.getPlayer().getUniqueId(), e.getPlayer().getName(), e.getMessage()));
        });

// Get an agent from the channel.
ChannelAgent<PlayerMessage> channelAgent = channel.newAgent();
channelAgent.register(this);

// Listen for messages sent on the channel.
channelAgent.addListener((agent, message) -> {
    Schedulers.sync().run(() -> {
        Bukkit.broadcastMessage("Player " + message.username + " says " + message.message);
    });
});

You can either integrate messenger into your own existing messaging system (using AbstractMessenger, or, use helper-redis, which implements Messenger using Jedis and the Redis PubSub system.

Conversations

helper also provides an additional abstraction for working with "conversation channels" - where sent messages require some sort of reply.

public class PrivateMessageSystem {
    private final ConversationChannel<PrivateMessage, PrivateMessageReply> channel;

    public PrivateMessageSystem(Messenger messenger) {
        this.channel = messenger.getConversationChannel("private-messages", PrivateMessage.class, PrivateMessageReply.class);

        ConversationChannelAgent<PrivateMessage, PrivateMessageReply> channelAgent = this.channel.newAgent();
        channelAgent.addListener((agent, message) -> {
            Promise<PrivateMessageReply> reply = Schedulers.sync().supply(() -> {
                Player player = Bukkit.getPlayerExact(message.to);
                if (player == null) {
                    return null;
                }

                player.sendMessage("You got a message from " + message.from + " saying " + message.message);
                return new PrivateMessageReply(message.getConversationId(), player.getName());
            });
            
            return ConversationReply.ofPromise(reply);
        });
    }

    public void sendMessage(Player from, String to, String message) {

        // create a new message object
        PrivateMessage pm = new PrivateMessage(from.getUniqueId(), message, to);

        this.channel.sendMessage(pm, new ConversationReplyListener<PrivateMessageReply>() {
            @Nonnull
            @Override
            public RegistrationAction onReply(@Nonnull PrivateMessageReply reply) {
                from.sendMessage("Your message was delivered successfully to " + reply.deliveredTo);
                return RegistrationAction.STOP_LISTENING;
            }

            @Override
            public void onTimeout(@Nonnull List<PrivateMessageReply> replies) {
                from.sendMessage("Unable to deliver your message to " + to);
            }
        }, 2, TimeUnit.SECONDS);
    }

    private static final class PrivateMessage implements ConversationMessage {
        private final UUID conversationId;
        private final UUID from;
        private final String message;
        private final String to;

        private PrivateMessage(UUID from, String message, String to) {
            this.conversationId = UUID.randomUUID();
            this.from = from;
            this.message = message;
            this.to = to;
        }

        @Nonnull
        @Override
        public UUID getConversationId() {
            return this.conversationId;
        }
    }

    private static final class PrivateMessageReply implements ConversationMessage {
        private final UUID conversationId;
        private final String deliveredTo;

        private PrivateMessageReply(UUID conversationId, String deliveredTo) {
            this.conversationId = conversationId;
            this.deliveredTo = deliveredTo;
        }

        @Nonnull
        @Override
        public UUID getConversationId() {
            return this.conversationId;
        }
    }

}

ReqResp

ReqRespChannels are an extended abstraction over ConversationChannels which make it super easy to send and receive requests over the network.

For example, if you had a core "hub" server which held player balances in memory, and lots of separate "minigame" servers on which sometimes players needed to make purchases:

(of course, in this situation you should probably store balances in a database.. it's just an example)

First, setup a channel. This process is the same for both the hub and minigame servers.

class WithdrawRequest {
    final UUID playerId;
    final int amount;

    WithdrawRequest(UUID playerId, int amount) {
        this.playerId = playerId;
        this.amount = amount;
    }
}

ReqRespChannel<WithdrawRequest, Boolean> channel = messenger.getReqRespChannel(
        "eco-withdraw", WithdrawRequest.class, Boolean.class
);

On the hub server, register a response handler:

Map<UUID, Integer> balances = new HashMap<>();
channel.responseHandler(request -> {
    int balance = balances.getOrDefault(request.playerId, 0);
    if (balance - request.amount >= 0) {
        // pls don't ever implement economy like this lol
        balances.put(request.playerId, balance - request.amount);
        return true;
    } else {
        return false;
    }
});

And on the mini-game server, make a request:

Promise<Boolean> response = channel.request(new WithdrawRequest(player.getUniqueId(), 100));
response.thenAcceptAsync(res -> {
    if (res) {
        // TODO: reward player purchase
    } else {
        player.sendMessage("Sorry, you don't have enough money!");
    }
});

Timeouts are handled automatically - the Promise will complete exceptionally with a TimeoutException if a reply is not received after 5 seconds.

Of course, the request/response types can be whatever you want, as long as GSON can serialize them, you're good!