Links
Comment on page

Muxer Server Plugin

Overview

Hadean Connect's Muxer Server Plugin is a library designed to handle the communication with a Connect cluster. It can be embedded into your server application to provide an API that allows data to be serialised and transmitted to Connect for distribution to your connected clients.
Muxer Server Plugin is only necessary when not using Hadean Simulate as your upstream simulation, because Simulate comes preconfigured with Connect connectivity
Multiple servers can connect to a single Connect cluster, these servers are considered as "upstream" as they are the source of data that later be received by the Muxer Client Plugin. Connect will aggregate data from all of the upstream servers sending out relevant data to each connected client. Connect is also able to receive data sent from Connect Clients. This is typically used to provide information about the connected client, such as the location of a character they are controlling within a simulation.
In order to connect to a Connect cluster it is necessary to:
  1. 1.
    Source and initialise the Muxer Server Plugin libraries in your application with details of the Connect cluster
  2. 2.
    Identify which server instance is sending the data
  3. 3.
    Write data to Connect
  4. 4.
    Read data from Connect

1. Initialising Muxer Server Plugin Code

The Muxer Server Plugin libraries are included in Hadean's Simulate SDK, with versions available for Windows and Linux, and additional version is included to support running in Unreal Engine on Linux
In your code you will need to include the server.hh file and will need to link to the relevant libmuxer-server.a file for your target build environment.
muxer-server
├── include
│   └── muxer
│   ├── client_message.hh
│   ├── detail
│   │   ├── client_writer.hh
│   │   └── threaded_multi_linuxfd_writer.hh
│   ├── server.hh
│   └── worker_id.hh
└── lib
├── linux
│   └── libmuxer-server.a
├── linux-unreal
│   └── libmuxer-server.a
└── win64
└── libmuxer-server.a
With this available it will then be possible to create endpoints for both reading and writing to Connect. This is done by creating instances of both a muxer::reader and muxer::writer. These instances are used to handle communication with the Muxer.
With the reader and writer created it will then be necessary to register the locations of the available Connect nodes, which may be different for reading and writing, but for most applications tend to be the same. Connect locations need to be passed to the reader and writer as instances of the aether::tcp::service_location type which stores two properties, the IP and the PORT for a Connect Node.
#include <muxer/server.hh>
...
// Declare end points for reading and writting to Muxers
muxer::writer writer = {};
muxer::reader reader= {};
// Create service_location details for each muxer node and populate
aether::tcp::service_location muxer_address = {};
strncpy(muxer_address.host, "127.0.0.1", sizeof(muxer_address.host));
strncpy(muxer_address.port, "8880", sizeof(muxer_address.port));
// Add details of Muxer locations to reader and writter
writer.add_location(muxer_address);
reader.add_location(muxer_address);
Note that it is good practice to lock down access to the upstream Muxer port when configuring your clusters. To do this you will need to restrict which IP addresses can access port 8880. See configuration for more details

2. Setting Server Identity (optional)

When multiple servers are sending data to a Connect cluster it is often useful to know which server has sent what data. This can be done by registering an ID for the server. The ID would normally be unique for a server, but does not need to be if there is no need to differentiate between different servers, perhaps grouping by service rather than instance. The id can be set by using the set_id() method on the writer and passing in a worker_id which is defined as an unsigned integer
const muxer::worker_id customer_server_id = { 8973493 };
writer.set_id(custom_server_id);

3. Reading Data From Connect

Only one server instance can connect to Connect to receive incoming messages. If you are sending data to Connect from multiple locations ensure that only one creates an instance of the reader.
Data sent from a connected client will be forwarded through the Connect nodes, arriving at any location you have set up a muxer::reader in the same format as it was sent. Each message will be received as a muxer::client_message which is ultimately a byte array.
In order to retrieve data from the queue in the reader, we can call the deserialize() method that will return a std::vector<client_message>off all the available messages.
muxer::reader reader = {};
...
for (const auto& msg : reader.deserialize()) {
// Perform your logic here
}
One you have your client_message there are several methods you can call on it to help processing.

Finding the source of the message

msg.get_source_userid() can be used to find the player id that sent the message. This is returned as an optional value as the player id may not have been sent. To test for the presence of a value use the has_value method.
aether::optional<uint64_t> maybe_player_id = msg.get_source_user_id();
if ( maybe_player_id.has_value()) {
std::cout << "Recieved Message from Player " << maybe_player_id.value() << std::endl;
}

Getting the message payload

To retrieve the actual byte data the get_payload method can be called on the client_message that will return the raw byte array sent. However client_message also includes a useful helper method that can be used to determine the type of the message based on the message size called payload_as_pod . This method will return a true / false response based on if the message matches the size of the passed variables data type, whilst also copying the data into that variable.
// Define a message type that may be recieved from a connected client
struct event_connect {
vec3f position;
uint64_t player_id;
uint64_t entity_id;
float rotation;
uint8_t model;
uint8_t animation;
};
event_connect connect; // Create an instance of our data type to try to populate
// Test if the message is a connect event
if (msg.payload_as_pod(connect)) {
std::out << "Recieved Message from Player " << maybe_player_id.value() << std::endl;
// Here we can handle any previously unrecognised messages
} else if (msg.get_payload().has_value()) {
const aether::container::span<const char>& payload = msg.get_payload().value();
std::cerr << "payload size " << payload.size_bytes() << std::endl;
std::cerr << "payload # elements " << payload.size() << std::endl;
std::cerr << "payload pointer " << payload.data() << std::endl;
}
When reading from Connect it is good practise to regularly collect received messages. Typically this would be done on every tick of your simulation.

4. Writing Data to Muxer

Connect is designed to send out the latest "State" from a server to a connected client. In order to synchronise data to ensure consistency it uses the concept of a tick to identify how recent data is. Connect will try to send the most up to date data to the client which it will determined by the tick number. Older data will be discarded in favour of sending the latest. This allows Connect to catch up should the server exceed the Connect's processing capacity, or the client is unable to receive data at the rate being transmitted.
Data will be dropped when data from a more recent tick is received from a server with the same server ID. If you do not set different server IDs then the ticks on your servers will need to be synchronised.
Data sent to Connect will need to be serialised to send over the wire. This is achieved by calling the serialise() method on the writer. The serialize() method takes a callback where you will load data into the writers buffer, and a tick id to indicate which time step this data should be considered part of.
To add data to the buffer in your callback to send use the push_bytes() on the writerProxy. As parameters it take the data to send and the size of that data. You can call push_bytes() as often as needed before eventually sending the data with send()
Entity data needs to conform to the entity_data struct found in the protocol.hh file. This requires a static header that includes the location of the entity, an ID and a state that is understood by Connect . The state should be set to ENTITY_STATE_NONE. This field will be updated by Connect before being received by the client to indicate
#include "protocol.hh"
#include "marshalling.hh"
muxer::writer writer = {};
...
// Define a lambda function that will be called to serialise you data
auto send_data = [&](auto writerProxy) {
entity_data ent;
ent.static_data.id = 1234;
ent.static_data.position.x = 1;
ent.static_data.position.y = 2;
ent.static_data.position.z = 0;
ent.static_data.state = ENTITY_STATE_NONE;
ent.user_data = { "test" }; // Define your data
// The marshalling function provides a convenienent way of serializing data
marshalling::marshaller m;
m.add_entity(ent);
auto data = m.encode();
writerProxy.push_bytes(data.data(), data.size());
writerProxy.send();
};
// Set the tick for the data, in your real code this should increment on eachnew state increment
uint64_t tick = 0;
writer.serialize(send, tick);

5. Shutting Down

When your application no longer needs to maintain a connection to Connect, the writer and reader should be shutdown, this can be done by calling the shutdown() method on each instance
writer.shutdown();
reader.shutdown();