Sending And Receiving

Reading messages

reader_type provides a single function which can be used to read messages within a process.

  • std::optional<aether::message::message> reader_type::get_next()

    • returns the next available message from this reader contained within a std::optional. If no further messages are available, std::nullopt is returned. A call to get_next() invalidates any previous messages returned by this call.

In all circumstances, any messages not read from the reader will be discarded.

The following functions are defined on the returned message:

  • std::optional<uint64_t> get_source_user_id() const

    • if this is an interaction message sent from a client, this returns the ID that the client authenticated with. For messages from any other source, std::nullopt will be returned. Note that if this message came from a client, the payload is untrusted data so care must be taken to ensure that it is not processed in a way that could cause security issues.

  • bool is_source_global_state() const

    • returns true if the source of this message was the global state process.

  • std::optional<container::span<const char>> get_payload() const

    • Returns the binary payload of the message contained within a std::optional. The std::optional will always contain a value for messages read from a reader_type. The returned span is only valid for the lifetime of the message.

  • template<typename T> bool payload_as_pod(T &dest) const

    • This is a convenience function. If T is a trivially copyable type and has the same size as the message, then the data of the message will be byte copied into dest and true returned. Otherwise, false will be returned.

      The utility of this function is limited. In particular, it cannot be used to distinguish betwen messsages where the types are the same size. In addition, care must be taken to ensure that a client-generated event is never interpreted as a message generated internally within the simulation as this could have a security implications.

We provide a simple example of reading messages, using the receive_messages() function of a Simulate worker as an example. We demonstrate reading interaction events from players and also the global state. In order to inform Simulate that we want to receive events from players we also need to implement the get_topics function, which is described later on.

// A direction
enum direction : char { up, down, left, right };
// An event sent by the user
struct player_event {
    direction dir;
    float amount;
};
// A message internal to Aether
struct hello_world { };
void user_cell_state_impl::receive_messages(const aether_state_type &aether_state, 
        message_reader_type &reader) {
    while(auto maybe_message = reader.get_next()) {
        const auto &message = maybe_message.value();
        const auto maybe_player_id = message.get_source_user_id();
        if (maybe_player_id.has_value()) {
            // We know the message came from a client here
            const auto player_id = maybe_player_id.value();
            player_event event;
            if (message.payload_as_pod(event)) {
                // Validate untrusted client input
                if (!event_is_valid(event)) { continue; }
                // Move the player
                move_player(player_id, event.dir, event.amount);
            } else {
                AETHER_LOG(DEBUG)("Invalid event received from player ", player_id);
            }
        } else if (message.is_source_global_state()) {
            // We know the message came from the global state process here
            hello_world hello;
            if (message.payload_as_pod(hello)) {
                AETHER_LOG(INFO)("Hello world received from global state");
            } else {
                AETHER_LOG(DEBUG)("Unknown message received from global state");
            }
        } else {
            AETHER_LOG(DEBUG)("Message received from unexpected source");
        }
    }
}
auto user_cell_state_impl::get_topics(const aether_state_type& aether_state) -> 
        std::vector<subscriber_topic_type> {
    // Described later in the section on subscribing
}

Sending messages

writer_type provides functionality to set the destination of a message and the message content.

The following functions are provided:

  • void set_destination(const destination_type &dest)

    • Sets the message destination. dest may be a value of any of the destination types previously described and will be automatically converted to destination_type. This function should be called for all sent messages.

  • void push_bytes(const void *data, size_t len)

    • Appends len bytes from data to the current message payload. This function may be called multiple times to add more data to the same message.

  • int send()

    • Marks the message as ready to be sent and resets the writer to send a new message. Returns 0 if this was successful and a negative value otherwise. There are currently no non-fatal error error conditions so any erorr returned by this function should be treated as such.

We provide a simple example of a worker process sending a message to the global state:

struct hello_msg {
    // Unspecified data
};
void user_cell_state_impl::send_messages(const aether_state_type &aether_state, message_writer_type &writer) {
    writer.set_destination(aether::message::global_state {});
    hello_msg msg;
    writer.push_bytes(&msg, sizeof(msg));
    const auto ret = writer.send();
    assert(ret == 0 && "Message sending failed");
}

Subscribing to topics

In the section on receiving messages, we provided a small example of how to process an interaction event sent from a player. However, in order to ensure these are received, it is necessary for an Simulate worker to inform Simulate of which players it wants to receive events from. Since a Simulate simulation is distributed, it would be wasteful of bandwidth and compute to inform Simulate workers of events unrelated to them, which is why subscribing to these events is necessary.

To subscribe to topics, the get_topics function of the user-defined state type must be defined:

  • std::vector<subscriber_topic_type> get_topics(const aether_state_type& aether_state)

    • called by Simulate each tick to determine the list of topics that this worker is interested in. Messages sent to matching topics will be delivered to this worker if it has expressed an interest in them.

Assuming the existence of a function find_ids_of_players_in_cell() which returns a list of player IDs which control entities in the current cell, we can write get_topics() as follows:

auto user_cell_state_impl::get_topics(const aether_state_type& aether_state) -> 
        std::vector<subscriber_topic_type> {
    std::vector<subscriber_topic_type> interest;
    const std::vector<uint64_t> players_ids = find_ids_of_players_in_cell();
    for(const auto &id : player_ids) {
        interest.push_back(aether::message::topic::user_id{id});
    }
    return interest;
}

The ‘unclaimed’ topic

There are situations in which messages may be published to a topic for which there is no Aether process listening. A common example of this is when a new client joins a simulation and entities have not yet been spawned for that client to control. If workers only subscribe to events from clients which own entities in the cell they control, interaction events from the new client are ignored.

Subscribing to the unclaimed_events topic enables a process to request to receive messages that would otherwise be discarded. This provides a mechanism for new players to be discovered and appropriate actions to be taken. Typically, such logic would be placed in the Global state process. If the global state process chooses a spawn position, a message to spawn an entity in the chosen position can be sent to the closest worker using the closest_worker destination type.

Last updated