Using Erlang with CORBA and DDS

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

INTRODUCTION

The trend in modern computer systems is toward increased numbers of processing units — more cores on a single die, multiple processors in a single machine, or linking machines into larger and larger clusters. As a consequence, complex concurrent applications that are difficult to develop are becoming the norm. The Erlang [1] language can simplify the task of writing distributed concurrent applications. Although a system can be developed entirely in Erlang, interaction with non-Erlang systems is a necessity. This article will show how Erlang can send and receive messages from the outside world via CORBA and DDS.

Erlang was developed over two decades ago by Ericsson for the implementation of large-scale telephony systems, and it is now used by companies [2] such as Amazon, Yahoo!, and Facebook, and in open-source applications such as CouchDB [3], a database engine, and RabbitMQ [4], an implementation of the AMQP messaging protocol [5].

Erlang is a functional language, relying strongly on pattern matching and recursion. Erlang processes are cheap to create, and use message passing, rather than shared memory, for communication. As there is no shared state, computations do not need to block on synchronization primitives which would guard that state, allowing an Erlang system to scale well with the number of available processors.

Another major strength of the Erlang language is fault-tolerance. Processes can be linked and if a process fails, all processes that are linked to it, even if running on a different physical machine, are notified. Action can then be taken to restart the process elsewhere, or to fail-over to alternative behavior. Running code can also be upgraded on the fly, without having to stop the system as a whole. These features allow Erlang-based systems to run continuously for years, without downtime.

In this article, we will use TAO [6] and OpenDDS [7] as the CORBA and DDS implementations, respectively. The code that accompanies this article has been tested under 64-bit Windows 7 with Visual Studio 2010, 32-bit GNU/Linux with GCC 4.3.2, and 64-bit GNU/Linux with GCC 4.4.3 (TAO and DDS compiled as 64-bit, with a 64-bit Erlang distribution). Although Erlang concepts will be described as needed, a full tutorial is beyond the scope of this article. For more information, please consult the several books that have been published [8, 9, 10], and the websites that have been created [11, 12, 13], that delve deeply into the language and its use.

CORBA

The Erlang distribution includes a CORBA ORB called Orber as part of the Open Telecom Platform (OTP). OTP is a collection of libraries and procedures for producing distributed applications. While Orber tutorials can be found at [14] and [15], we will extend the example presented in the Middleware News Brief Multi-Language CORBA Development with C++ (TAO), Java (JacORB), Perl (opalORB), and C# (IIOP.NET) [16] to include Erlang.

In that article, servers were created in each language to provide an object that implements the Math interface, as follows:

// CORBA/Erlang/Math.idl
module MathModule {
    interface Math 
    {
        long Add(in long x, in long y);   
    };
};

Clients were then created in each language, and tests run to demonstrate that clients in any language can connect to servers in any language. For this test, we will only reference the C++ client and server in order to demonstrate interoperability between Orber and other ORBs.

We begin by compiling the IDL file with the Erlang compiler.

erlc Math.idl

The files MathModule.hrlMathModule_Math.erlMathModule_Math.hrloe_Math.erl, and oe_Math.hrl are created, which include client stubs and other CORBA infrastructure. We can also create server stubs by running the compiler again, and specifying a back-end template:

erlc +"{be,erl_template}" Math.idl

This creates the file MathModule_Math_impl.erl. In this file, the skeleton for the Add method is created as:

'Add'(State, X, Y) ->
    {reply, OE_Reply, State}.

In Erlang, variables begin with uppercase letters, but function names must be atoms (non-numerical constants). The single quotes are used to convert the term into an atom. The variablesX and Y are mapped from CORBA longs to Erlang integers — see [17] for the complete CORBA type mapping. We now implement the function as a sum of X and Y, assigning the result toOE_Reply. Within a function, expressions in an expression list are separated by commas and executed sequentially. Here, the last expression is a 3-tuple (denoted by curly braces) to return from the function, with the first element of the tuple the atom reply, the second the variableOE_Reply, and the third the variable State that was passed into the function.

'Add'(State, X, Y) ->
    OE_Reply = X + Y,
    {reply, OE_Reply, State}.

We can now create server and client tests. Create a file named crb.erl. Code is arranged into modules, as specified by the -module() declaration. Functions within a module that are to be used outside of the module are specified in a list (denoted by square brackets) in an -export() declaration. The name of each function to export is followed by a slash, and then the arity (number of parameters) of the function. Here, we export the client and server test functions. Each expression in Erlang, including these definitions, is terminated by a period and whitespace. Comments are denoted by %%.

%% CORBA/Erlang/crb.erl
-module(crb).
-export([client_test/1, server_test/1]).

Next, we create a helper function, start_orber(), to initialize a single-node instance of Orber.

start_orber(Name) ->
    mnesia:start(),
    corba:orb_init([{domain, Name}, 
                    {orber_debug_level, 10}, 
                    {iiop_port, 0} ]),
    orber:install([node()], 
                  [{ifr_storage_type, ram_copies},
                   {nameservice_storage_type, ram_copies}]),
    orber:start().

This function takes the name of the domain as an argument. Orber instances within the same domain communicate via the Erlang distribution protocol, but communication between instances in different domains is by the OMG's GIOP. Attempting to use GIOP between a client and server in the same domain will cause an OBJECT_NOT_EXIST CORBA exception to be raised.

start_orber() begins by calling mnesia:start() which starts the Mnesia database [18]. Orber uses Mnesia to store internal data, where the database can be maintained fully in RAM, as indicated by the ram_copies options to orber:install(). If database persistence is desired, a Mnesia schema must first be created, and disc_copies specified, as described in [14]. For our purposes, a RAM-only database is sufficient.

Next, corba:orb_init() is called to initialize the ORB. (In Erlang, functions that have been exported from modules are called by prefixing the function name by the module they reside in, followed by a colon.) A large number of options [19] can be set, but here only the domain, debug level, and port are specified. A value of 0 for the port causes a random, unused port to be chosen. If the iiop_port option is not provided, the port used defaults to 4001. This prevents multiple Orber instances from starting, as, on a given machine, multiple sockets cannot listen on the same port simultaneously. These options are presented as a list (in square brackets) of tuples.

Finally, orber:install() is called to configure Orber to use the current node and in-memory tables, and orber:start() runs the ORB.

We now define an additional helper method to read a stringified IOR of a server from a file, and return it as a string. file:read_file() returns a 2-tuple. Upon success, the first element is the atom ok, and the second element is a binary data object which is the contents of the file. In Erlang, strings are lists of characters so erlang:binary_to_list() is called to convert the binary object to a string.

readIOR(FileName) ->
    {ok, Binary} = file:read_file(FileName),
    erlang:binary_to_list(Binary).

We can now write the client test.

client_test(["ior", IORFile, "add", Xp, Yp]) ->
    start_orber("client"),
    {X, _} = string:to_integer(Xp),
    {Y, _} = string:to_integer(Yp),
    oe_Math:oe_register(),
    Obj = corba:string_to_object(readIOR(IORFile)),
    Res = 'MathModule_Math':'Add'(Obj, X, Y),
    io:format("Sum: ~p~n", [Res]),
    init:stop().

This method accepts a list of strings as parameters, where the first and third are "ior" and "add", respectively. The function first starts Orber in the client domain, and then converts the Xand Y values from their initial string representation to integers via the string:to_integer() function. The second element of the 2-tuple returned by string:to_integer() is a list of unconverted text — here, all text will be converted and the list empty, so we use an underscore to match against the tuple element that we do not care to receive.

The Math interface is registered in the Interface Repository by the call to oe_Math:oe_register(), and the stringified IOR is converted to an object reference via corba:string_to_object().

Calls to CORBA methods are via the syntax Module:Method(ObjectReference, Parameters), here as Res = 'MathModule_Math':'Add'(Obj, X, Y). The result is printed to standard output by io:format(). The test ends with a call to init:stop() which shuts down the currently-running Erlang node.

Before running the test, the environment variable ERL_ROOT must be set to the root of the Erlang installation. A typical installation path under 64-bit Windows 7 is C:\Program Files (x86)\erl5.8.2. The script run_test.pl in the CORBA/Test directory runs the client test with a command-line (entered on a single line) similar to:

<ERL_ROOT>/bin/erl -pa ../Erlang -noshell 
    -run crb client_test ior server.ior add 5 7

The -pa argument adds the ../Erlang directory to the start of the module search path. The -noshell argument runs Erlang without starting an interactive shell. -run  is used to invoke the client_test() function in the crb module, with the arguments ior server.ior add 5 7 passed as a list of strings. These match the arguments ["ior", IORFile, "add", Xp, Yp] of client_test(). The complete list of arguments that can be passed to erl can be found here [20]. Note that arguments beginning with a dash are directed to erl and not passed as function arguments, so ior and add are used instead of -ior and -add as is done with the ORBs in the other programming languages. Save for this difference, the run_test.pl used here is the same as in the aforementioned Middleware News Brief.

We next write the server test, which is simpler than the client test, as follows:

server_test([IORFile]) ->
    start_orber("server"),
    Obj = 'MathModule_Math':oe_create(),
    writeIOR(IORFile, corba:object_to_string(Obj)).

The server accepts one parameter, IORFile, the name of the file to which it will write the server's IOR. Orber is started in the server domain, the Math server begins execution with oe_create(), the returned object reference is stringified, and written to the file via the writeIOR helper function, defined as:

writeIOR(FileName, IOR) ->
    {ok, FileDesc} = file:open(FileName, [write]),
    file:write(FileDesc, IOR),
    file:close(FileDesc).

As with the client test, the server test is executed by run_test.pl in an analogous way:

  1. <ERL_ROOT>/bin/erl -pa ../Erlang -noshell
  2. -run crb server_test erlang.ior

After compiling all of the .erl files, MathModule_Math.erl, oe_Math.erl, MathModule_Math_impl.erl and crb.erl, with erlc, running CORBA/Test/run_test.pl produces output such as:

Starting TAO server
Starting orber server
Running test: TAO server, TAO client
  84+81 => Expected: 165  Actual: 165  => success
Running test: TAO server, orber client
  89+9 => Expected: 98  Actual: 98  => success
Running test: orber server, TAO client
  35+16 => Expected: 51  Actual: 51  => success
Running test: orber server, orber client
  98+3 => Expected: 101  Actual: 101  => success
Stopping TAO server
Stopping orber server

The output is now similar to that of the prior MNB, showing that Orber can interoperate with TAO.

DDS

Unlike CORBA, Erlang does not have direct support for DDS, but can interoperate with C/C++, the implementation language of OpenDDS. We will create a "C Node" [21], an Erlang node implemented in C (C++, in our case), to act as a gateway between Erlang and DDS.

We will use a variant of the DDS Messenger sample [22] for our purposes, with the following IDL:

// DDS/CPP/DDS_IDL/Messenger.idl
module Messenger {
    #pragma DCPS_DATA_TYPE "Messenger::Message"
 
    struct Message {
        string msg;
        long id;
    };
};

We wish to create an Erlang subscriber which receives Messenger samples, as well as an Erlang publisher which sends Messenger samples. We will begin by defining a message protocol between the Erlang processes and the C++ gateway.

We need one message to support publishing from Erlang, three to support subscription, and one for Gateway termination. We will define them as tuples, with the first element an atom indicating the message type.

{ publish, {,} } Send the associated tuple as the DDS sample
{ subscribe,} Add the ID of the current process as a process to send DDS samples to
{ unsubscribe,} Remove the ID of the current process as a process to send DDS samples to
{ shutdown } Terminate the gateway

In our example, we will publish 10 samples with decreasing values of the ID — when the ID reaches 0, the test will end. We begin the file dds.erl with a module and export declaration below. We will use the same Erlang source file for both the publisher and subscriber tests, so the necessary functions for both tests are exported.

%% DDS/Erlang/dds.erl
-module(dds).
-export([publisher_test/1, subscriber_test/1, init_stop/0]).

The publisher_test() function calls the internal publisher_test_loop() function with three arguments: the node name of the gateway, the message to send, and a loop count of 10 iterations. As before, arguments from the test script are passed as strings, so the node name of the gateway is converted to an atom before being passed to publisher_test_loop().

publisher_test([Gateway, Message]) ->
    publisher_test_loop(list_to_atom(Gateway), Message, 10).

The function publisher_test_loop() is:

publisher_test_loop(Gateway, Message, Count) ->
    if 
      Count >= 0 -> 
        io:format("[Erlang ~p] Publish: msg='~p' id=~p~n", 
          [node(), Message, Count]),
        { any, Gateway } ! { publish, { Message, Count } },
        wait(1000),
        publisher_test_loop(Gateway, Message, Count-1);
      true ->
        { any, Gateway } ! { shutdown },
        io:format("[Erlang ~p]: done~n", [node()]),
        init_stop()
    end.

Each element of an Erlang if expression is a guard (boolean expression), followed by an arrow, followed by a sequence of expressions. Guards are evaluated in the order presented — the first guard that is found that evaluates to true has its associated expression list evaluated, and the ifstatement terminates. The first guard above is true when the value of Count is greater than, or equal to, zero.

When that condition occurs, text is printed to the console showing the message to be published, and a publish message is sent to the gateway node. The tuple on the left of the exclamation point is the address (process ID) to send a message to, the tuple on the right is the message, and the exclamation point is the send operator. After the message is sent, execution pauses for one second before continuing. The code then proceeds by recursively calling publisher_test_loop() with the message count reduced by one. Looping by tail recursion is a common pattern in Erlang, that is, the current state of a computation is passed as parameters to the function being looped, after being modified, as necessary, during the current loop iteration.

If the first guard of the if statement fails, the next, and, in this case, last, guard is evaluated. The value true is always true, so, here, acts as an else clause to the if. If this guard is triggered, then the count has reached zero. In that case, the shutdown message is sent to the gateway, an indication of completion is printed to the console, and the Erlang node terminated. Exiting the function would normally be performed by simply not performing another tail-recursive call, but for the purposes of the test we desire the entire Erlang node to shut down.

The subscriber test begins with a call to the subscriber_test() function. First, the subscribe message is sent to the gateway, to inform the gateway that the current process is interested in receiving DDS samples — the function self() returns the process ID of the current process. The subscriber_test_loop() function is then called to continue the test.

subscriber_test([Gateway]) ->
    { any, list_to_atom(Gateway) } ! { subscribe, self() },
    subscriber_test_loop(list_to_atom(Gateway)).

The subscriber_test_loop() provides a selective receive. Messages, when sent to a process, are stored in a mailbox for later retrieval. The receive...end expression performs a pattern-match against the messages in the mailbox, returning the first that matches. It is good to provide a wildcard match against unexpected messages, as otherwise, messages that are not extracted from the mailbox will remain and continue to consume memory.

subscriber_test_loop(Gateway) ->
    receive
        { message, _, 0 } ->
            { any, Gateway } ! { shutdown },
            io:format("[Erlang ~p]: done~n", [node()]),
            init_stop();
        { message, Msg, Id } ->
            io:format("[Erlang ~p] Received: msg='~p' id=~p~n", 
              [node(), Msg, Id]),
            subscriber_test_loop(Gateway);
        Any ->
            io:format(
              "[Erlang ~p]: Received unknown message ~p~n", 
              [node(), Any]),
            subscriber_test_loop(Gateway)
    end.

The first pattern matched against is { message, _, 0 }, where the a matching message consists of a 3-tuple with the atom message in the first position, the value 0 in the third position, and anything in the second position. As a message with ID 0 is considered the termination message, if this is received, the shutdown message is sent to the gateway, a status indication is printed to the console, and, as before for the purpose of the test, the Erlang node is shut down.

The second pattern matched against is { message, Msg, Id }. This message is similar in structure to the previous, but, if the matching process has proceeded this far, then the ID value cannot be zero — so we bind the variables Msg and Id to the message text and count respectively. We then display a status message to the screen indicating that the incoming message was received, and then tail-recurse, waiting for the next message to arrive.

The final pattern causes any unknown messages to be accepted and discarded.

In the above, two helper functions were used: init_stop() and wait()init_stop() is simply:

init_stop() -> init:stop().

 where the supplied stop() function in the init module is invoked. wait() is implemented as:

wait(MS) ->
    receive
        after MS -> true
    end.

The receive...end expression, by default, is blocking, but by adding the after clause, can be made non-blocking in that an action can be taken after a timeout interval. Here, no messages are matched against, but after, with a delay in milliseconds, is used to exit the receive...end after the specified time has elapsed.

The gateway consists of three logical pieces: a DDS publisher, a DDS subscriber, and an Erlang node to bridge Erlang and C++. We will begin by developing the Erlang node.

Erlang provides the Erlang Interface C-API [23] for developing an Erlang node in C or C++ (a "C Node" [21]). Before generating project files with MPC [24] and compiling the code, be sure to set the environment variable ERLI_ROOT to point to the directory containing the root of the Erlang interface headers and library files. A typical installation path under 64-bit Windows 7 is C:\Program Files (x86)\erl5.8.2\lib\erl_interface-3.7.2.

In the file ErlangNode.h, begin as follows:

// DDS/CPP/Gateway/ErlangNode.h
#include "XBuff.h"
#include "Runnable.h"
 
class ErlangNode : public Runnable {
protected:
    std::string _shortName;
    std::string _secretCookie;
    int _port;
    int _fd;

The short name of the node is used as the node's address, and corresponds to the Gateway parameter in the Erlang code above. Erlang nodes must share the same secret cookie value in order to communicate, so the Erlang nodes and C node must use the same cookie. A C node opens a TCP socket for communication to Erlang, so the socket's port and file descriptor are maintained.

ErlangNode is designed as a base class for easy implementation of C nodes. As such, two methods to override are provided. OnMessage() will be called when an Erlang message arrives, and OnIdle() will be repeatedly called when there are no incoming messages to process, so other work can be performed.

protected:
    virtual bool OnMessage(erlang_pid /*from*/, 
        XBuff& /*buff*/) { return true; }
    virtual bool OnIdle() { return true; }

The Listen() method opens a socket on the given port.

 int Listen(int port) {
        int listen_fd;
        struct sockaddr_in addr;
        int on = 1;
        if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
            return (-1);
        setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, 
            (char *)&on, sizeof(on));
        memset((void*)&addr, 0, (size_t) sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        if (bind(listen_fd, (struct sockaddr*) &addr, 
            sizeof(addr)) < 0)
            return (-1);
        listen(listen_fd, 5);
        return listen_fd;
    }

The Send() method sends an Erlang message, stored in an XBuff, to a specified recipient.

void Send(erlang_pid to, XBuff &buff) {
        if (ei_send(_fd, &to, buff.get()->buff, 
            buff.get()->index) < 0)
            throw EIException("ei_reg_send error failed", 
                erl_errno);
    }

The constructor stores the node name, cookie, and port for later use.

 ErlangNode(std::string shortName, std::string secretCookie,
       int port) : _shortName(shortName), 
       _secretCookie(secretCookie), _port(port) {}

The main work is performed in the Svc() method. We first initialize the ei_cnode structure. For most API functions, erl_errno is set when an error has occurred.

   void *Svc() {
        try {
            ei_cnode ec;
            int n=0;
            if (ei_connect_init(&ec, _shortName.c_str(),
                _secretCookie.c_str(), n++) < 0) 
                throw EIException("ei_connect_init failed", 
                erl_errno);

Next, we create a socket for listening.

            int listen;
            if ((listen = Listen(_port)) <= 0)
                throw EIException("my_listen failed", errno);

A daemon process, epmd, runs on each Erlang host, and the port that we are listening on must be registered with it.

          if (ei_publish(&ec, _port) == -1)
                throw EIException("erl_publish failed", 
                erl_errno);

We now wait for communication with an Erlang node to be established.

            ErlConnect conn;
            if ((_fd = ei_accept(&ec, listen, &conn)) 
                == ERL_ERROR)
                throw EIException("erl_accept failed", 
                erl_errno);

Once connected, the main loop executes. The function ei_xreceive_msg_tmo() is called to receive Erlang messages. If the return value from this function is ERL_REG_SEND or ERL_SEND, a message has arrived, so OnMessage() is called to process it. If the return value is ERL_ERROR, an error or timeout has occurred, so OnIdle() is called. For our purposes, other return values and message types can be ignored.

If OnMessage() or OnIdle() return true, the loop continues, else the loop, and Erlang node, exits.

         while (true) {
                erlang_msg msg;
                XBuff buff(false);
 
                int rcv = ei_xreceive_msg_tmo(_fd, &msg, 
                    buff.get(), 10); // 10 ms
                if (rcv == ERL_MSG) {
                    if ((msg.msgtype == ERL_REG_SEND) || 
                        (msg.msgtype == ERL_SEND)) {
                        if (!OnMessage(msg.from, buff))
                            break;
                    }
                    // ignore other message types
                }
                else if (rcv == ERL_ERROR) {
                    if (!OnIdle())
                        break;
                }
                // ignore other ei_xreceive_msg_tmo return values
            }
        }
        catch (std::exception &e) {
            std::cerr << "Exception: " << e.what() << std::endl;
        }
        return 0;
    }
};

We wish the Erlang node to run independently of the main thread of the gateway, so we define the interface Runnable to represent code which can be executed in its own thread. Although not needed by our example, this is beneficial as it allows multiple Erlang nodes to be hosted independently within a single C++ application.

// DDS/CPP/Gateway/Runnable.h
#include <ace/Thread.h>
 
class Runnable {
public:
    virtual void *Svc() = 0;
};

Class Runner executes code that is Runnable in the context of an ACE_Thread. The Start()method begins execution, while Wait() blocks until the code being executed terminates. A more fully-featured class would implement cancellation as well, but that is not needed for our example.

class Runner {
    ACE_hthread_t _thread;
    Runnable *_runnable; 
 
    static void *Run(void *p) {
        Runnable *runnable = 
            (reinterpret_cast<Runner *>(p))->_runnable;
        void *rtn = runnable->Svc();
        delete runnable;
        return rtn;
    }
 
public:
    Runner(Runnable *runnable) : _thread(0), 
        _runnable(runnable) {}
 
    void Start() {
        ACE_Thread::spawn(reinterpret_cast<ACE_THR_FUNC>(Run), 
            this, THR_NEW_LWP|THR_JOINABLE, 0, &_thread);
    }
 
    void Wait() {
        if (_thread != 0)
            ACE_Thread::join(_thread);
    }
};

Erlang messages are packed into ei_x_buff structures — class XBuff provides a thin C++ wrapper around the life-cycle management, encoding and decoding functions to simplify the use of ei_x_buff. The class XBuff itself creates and destroys an ei_x_buff, plus provides access to encoder and decoder objects.

// DDS/CPP/Gateway/XBuff.h
class XBuff {
    ei_x_buff _buff;
 
public:
    XBuff(bool initWithVersion) {
        if (initWithVersion)
            ei_x_new_with_version(&_buff);
        else
            ei_x_new(&_buff);
    }
    ~XBuff() {
        ei_x_free(&_buff);
    }
 
    ei_x_buff *get() { return &_buff; }
 
    XBuffDecoder GetDecoder() { return XBuffDecoder(_buff); }
    XBuffEncoder GetEncoder() { return XBuffEncoder(_buff); }
};

For our example, only a select few types must be encoded: atoms, longs, strings, and tuples.

class XBuffEncoder {
    ei_x_buff &_buff;
 
    XBuffEncoder(ei_x_buff &buff) : _buff(buff) {}
public:
 
    void SetTupleHeader(int arity) {
        if (ei_x_encode_tuple_header(&_buff, arity) < 0)
            throw EIEncodeException(
                "ei_x_encode_tuple_header failed");
    }
    void SetAtom(std::string atom) {
        if (ei_x_encode_atom(&_buff, atom.c_str()) < 0)
            throw EIEncodeException("ei_x_encode_atom failed");
    }
    void SetLong(long l) {
        if (ei_x_encode_long(&_buff, l) < 0)
            throw EIEncodeException("ei_x_encode_long failed");
    }
    void SetString(std::string str) {
        if (ei_x_encode_string(&_buff, str.c_str()) < 0)
            throw EIEncodeException("ei_x_encode_atom failed");
    }
 
    // more types
 
    friend class XBuff;
};

These types, in addition to process IDs, must be decoded.

class XBuffDecoder {
    ei_x_buff &_buff;
    int _offset;
 
    XBuffDecoder(ei_x_buff &buff) : _buff(buff), _offset(0) {}
 
    void GetType(int &type, int &size) {
        if (ei_get_type(_buff.buff, &_offset, &type, &size) < 0)
            throw EIDecodeException("ei_get_type failed");
    }
 
public:
    int GetVersion() {
        int version;
        if (ei_decode_version(_buff.buff, &_offset, &version) < 0)
            throw EIDecodeException("ei_decode_version failed");
        return version;
    }
 
    int GetTupleHeader() {
        int arity;
        if (ei_decode_tuple_header(
            _buff.buff, &_offset, &arity) < 0) 
            throw EIDecodeException(
                "ei_decode_tuple_header failed");
        return arity;
    }
 
    std::string GetAtom() {
        char atom[MAXATOMLEN+1];
        if (ei_decode_atom(_buff.buff, &_offset, atom) < 0)
            throw EIDecodeException("ei_decode_atom failed");
        return atom;
    }
 
    std::string GetString() {
        int type, size;
        GetType(type, size);
        char *p = new char[size+1];
        if (p == NULL)
            throw EIDecodeException("ei_malloc failed");
        if (ei_decode_string(_buff.buff, &_offset, p) < 0) {
            delete [] p;
            throw EIDecodeException("ei_decode_string failed");
        }
        std::string s = p;
        delete [] p;
        return s;
    }
 
    erlang_pid GetPID() {
        erlang_pid pid;
        if (ei_decode_pid(_buff.buff, &_offset, &pid) < 0)
            throw EIDecodeException("ei_decode_atom failed");
        return pid;
    }
 
    long GetLong() {
        long l;
        if (ei_decode_long(_buff.buff, &_offset, &l) < 0)
            throw EIDecodeException("ei_decode_long failed");
        return l;
    }
 
    // more data types
 
    friend class XBuff;
};

We now turn to the main gateway application in main.cpp. We use an ACE_Arg_Shifter to parse the command-line arguments.

// DDS/CPP/Gateway/main.cpp
void GetArgs(int argc, char *argv[],
    std::string &sname, std::string &secretcookie, int &port) {
        ACE_Arg_Shifter arg_shifter(argc, argv);
        while (arg_shifter.is_anything_left()) {
            const ACE_TCHAR *currentArg = 0;
            if ((currentArg = arg_shifter.get_the_parameter(
                ACE_TEXT("-sname"))) != 0) {
                sname = currentArg;
                arg_shifter.consume_arg();
            }
            else if ((currentArg = arg_shifter.get_the_parameter(
                ACE_TEXT("-setcookie"))) != 0) {
                secretcookie = currentArg;
                arg_shifter.consume_arg();
            }
            else if ((currentArg = arg_shifter.get_the_parameter(
                ACE_TEXT("-port"))) != 0) {
                port = ACE_OS::atoi(currentArg);
                arg_shifter.consume_arg();
            }
            else
                arg_shifter.ignore_arg();
        }   
}
 
int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) {
    try {
        std::string sname("alice"), secretcookie("secretcookie");
        int port = 8001;
        GetArgs(argc, argv, sname, secretcookie, port);

We now implement standard DDS publisher and subscriber infrastructure:

        DDS::DomainParticipantFactory_var dpf =
            TheParticipantFactoryWithArgs(argc, argv);
 
        // create domain participant
        DDS::DomainParticipant_var participant =
            dpf->create_participant(42,
            PARTICIPANT_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == participant) 
            throw MyException("create_participant failed");
 
 
        // register type
        Messenger::MessageTypeSupport_var ts =
            new Messenger::MessageTypeSupportImpl();
 
        if (ts->register_type(participant.in(), "") != 
            DDS::RETCODE_OK) 
            throw MyException("reigster_type failed");
 
 
        // create topic
        CORBA::String_var type_name = ts->get_type_name();
        DDS::Topic_var topic =
            participant->create_topic("MyTopic",
            type_name.in(),
            TOPIC_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == topic) 
            throw MyException("create_topic failed");
 
 
        // create publisher
        DDS::Publisher_var publisher =
            participant->create_publisher(PUBLISHER_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == publisher) 
            throw MyException("create_publisher failed");
 
 
        // create subscriber
        DDS::Subscriber_var subscriber =
            participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == subscriber) 
            throw MyException("create_subscriber failed");

OpenDDS, as of this writing, allows both the publisher and subscriber to share the same transport.

        // create and attach the transport
        OpenDDS::DCPS::TransportImpl_rch transport_impl =
            TheTransportFactory->create_transport_impl(
                OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
                OpenDDS::DCPS::AUTO_CONFIG);
 
        if (transport_impl->attach(publisher.in()) != 
            OpenDDS::DCPS::ATTACH_OK)
            throw MyException(
                "transport creation for the publisher failed");
 
        if (transport_impl->attach(subscriber.in()) != 
            OpenDDS::DCPS::ATTACH_OK)
            throw MyException(
                "transport creation for the subscriber failed");

We finish the publisher code by creating a DataWriter.

        // create and narrow datawriter
        DDS::DataWriter_var writer =
            publisher->create_datawriter(topic.in(),
            DATAWRITER_QOS_DEFAULT,
            DDS::DataWriterListener::_nil(),
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == writer) 
            throw MyException("create_datawriter failed");
 
        Messenger::MessageDataWriter_var message_writer =
            Messenger::MessageDataWriter::_narrow(writer.in());
 
        if (0 == message_writer) 
            throw MyException("writer _narrow failed");

We complete the subscriber code by creating a DataReader, and associated listener.

        // create a common message queue
        MessageQueue<MessageType> messageQueue;
 
 
        // create and narrow datareader, assigning listener
        DDS::DataReaderListener_var listener(new 
            DataReaderListenerImpl(messageQueue));
 
        DDS::DataReader_var reader =
            subscriber->create_datareader(topic.in(),
            DATAREADER_QOS_DEFAULT,
            listener.in(),
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == reader) 
            throw MyException("create_datareader failed");

MessageQueue is a thread-safe list to hold DDS samples as they are received. The list itself is implemented by using an ACE_Guard to protect methods of a std::list:

// DDS/CPP/Gateway/MessageQueue.h
#include <list>
 
template<typename T>
class MessageQueue {
    std::list<T> _q;
    ACE_Thread_Mutex lock_;
 
public:
    void Insert(T t) {
        ACE_Guard<ACE_Thread_Mutex> guard(lock_);
        _q.push_back(t);
    }
 
    T Remove() {
        ACE_Guard<ACE_Thread_Mutex> guard(lock_);
        T front = _q.front();
        _q.pop_front();
        return front;
    }
 
    bool empty() {
        ACE_Guard<ACE_Thread_Mutex> guard(lock_);
        return _q.empty();
    }
 
    void clear() {
        ACE_Guard<ACE_Thread_Mutex> guard(lock_);
        return _q.clear();
    }
};

The MessageQueue<T> stores smart pointer-wrapped DDS samples.

// DDS/CPP/Gateway/defs.h
typedef std::tr1::shared_ptr<Messenger::Message> MessageType;

Messages are added to the MessageQueue by the DataReader listener's OnDataAvailable() method.

// DDS/CPP/Gateway/DataReaderListenerImpl.cpp
 void DataReaderListenerImpl::on_data_available(
     DDS::DataReader_ptr reader)
     ACE_THROW_SPEC((CORBA::SystemException))
{
    Messenger::MessageDataReader_var reader_i =
        Messenger::MessageDataReader::_narrow(reader);
 
    if (CORBA::is_nil(reader_i.in())) {
        ACE_ERROR((LM_ERROR,
            ACE_TEXT("ERROR: %N:%l: on_data_available() -")
            ACE_TEXT(" _narrow failed!\n")));
        ACE_OS::exit(-1);
    }
 
    MessageType message(new Messenger::Message);
    DDS::SampleInfo info;
 
    DDS::ReturnCode_t error = 
        reader_i->take_next_sample(*message, info);
 
    if (error == DDS::RETCODE_OK) {
 
        if (info.valid_data)
            _messageQueue.Insert(message);
 
    } else {
        ACE_ERROR((LM_ERROR,
            ACE_TEXT("ERROR: %N:%l: on_data_available() -")
            ACE_TEXT(" take_next_sample failed!\n")));
    }
}

We complete ACE_TMAIN() by running the DDSGateway Erlang node, waiting for its termination, and cleaning up when done.

        // DDS/CPP/Gateway/main.cpp
        Runner r(new DDSGateway(sname, secretcookie, port, 
            message_writer.in(), messageQueue));
        r.Start();
        r.Wait();
 
        // clean up
        participant->delete_contained_entities();
        dpf->delete_participant(participant.in());
 
        TheTransportFactory->release();
        TheServiceParticipant->shutdown();
 
    } catch (const CORBA::Exception& e) {
        e._tao_print_exception("Exception caught in main():");
        return -1;
    } catch (const std::exception& e) {
        std::cerr << "Exception caught in main(): " << e.what() 
            << std::endl;
        return -1;
    }
 
    return 0; 
}

Now that the infrastructure is in place, we can implement DDSGateway, the core of the gateway process. DDSGateway maintains a reference to the Message DataWriter, allowing it to receive Erlang messages and publish them as DDS ones. DDSGateway also maintains a reference to the MessageQueue from the DDS subscriber, and a collection of Erlang process IDs, allowing it to receive messages from the DDS subscriber, and send them as Erlang ones. The constructor also sets the Erlang node name, cookie, and listening port.

// DDS/CPP/Gateway/DDSGateway.h
class DDSGateway : public ErlangNode {
    Messenger::MessageDataWriter_var _messageWriter;
    std::set<erlang_pid> _subscribers;
    MessageQueue<MessageType> &_messageQueue;
public:
    DDSGateway(std::string shortName, std::string secretCookie, 
        int port, Messenger::MessageDataWriter_ptr messageWriter, 
        MessageQueue<MessageType> &messageQueue) :
    ErlangNode(shortName, secretCookie, port),
        _messageWriter(messageWriter), 
        _messageQueue(messageQueue) {}
 
    virtual bool OnMessage(erlang_pid /*from*/, XBuff& /*buff*/);
    virtual bool OnIdle();
};

Implementation of DDSGateway is straightforward, as we only need to override OnMessage() and OnIdle(). In OnMessage(), we decode the first part of every incoming message the same way as the start of a 3-tuple, followed by an atom indicating the type of message being sent.

// DDS/CPP/Gateway/DDSGateway.cpp
bool DDSGateway::OnMessage(erlang_pid /*from*/, XBuff &buff) {
    // Process these messages:
    // { publish, { <string>, <long> } }
    // { subscribe, pid }
    // { unsubscribe, pid }
    // { shutdown }
 
    XBuffDecoder d = buff.GetDecoder();
    d.GetVersion();
    d.GetTupleHeader();
    std::string cmd = d.GetAtom();

If the message is shutdown, we terminate the Erlang node by returning false.

    // on shutdown, exit
    if (cmd == "shutdown")
        return false;  

If the message is publish, we extract the message and ID values, assign them to a DDS Messenger::Message structure, and publish it as a DDS sample.

    // publish
    if (cmd == "publish") {
        d.GetTupleHeader();
        Messenger::Message message;
        message.msg = d.GetString().c_str();
        message.id = d.GetLong();
 
        DDS::ReturnCode_t err = 
            _messageWriter->write(message, DDS::HANDLE_NIL);
        if (err!=DDS::RETCODE_OK)
            throw MyException("DDS write failed"); 
    }

Finally, if a subscribe or unsubscribe message is received, then the supplied PID is added or removed from the subscriber collection, respectively.

    // subscribe
    if (cmd == "subscribe")
        _subscribers.insert(d.GetPID());
 
    // unsubscribe
    if (cmd == "unsubscribe")
        _subscribers.erase(d.GetPID());
 
    return true;
}

In OnIdle(), we handle any pending DDS messages that have been received in the MessageQueue. If there are no Erlang subscribers, however, then any pending messages can be discarded. As an extension, quality of service criteria can be applied here, such as maintaining a history of samples that can be sent to late-joining subscribers which mirrors the DDS DURABILITY policy [22].

bool DDSGateway::OnIdle() { 
    // if there are no subscribers, 
    // discard pending messages and return
    if (_subscribers.empty()) {
        _messageQueue.clear();
        return true;
    }

If there is at least one subscriber, send all pending messages to each subscriber. The subscriber list cannot change while the loop executes, as OnMessage() will not be called again until OnIdle() returns. The MessageQueue, however, can change, so repeatedly removing DDS samples until the queue is (at least momentarily) empty will ensure that all samples are handled properly.

    // otherwise, send each waiting message to all subscribers
    while (!_messageQueue.empty()) {
        MessageType m = _messageQueue.Remove();
 
        for (std::set<erlang_pid>::iterator 
            subscriber = _subscribers.begin(); 
            subscriber!=_subscribers.end(); subscriber++) {

Each DDS sample is formed into an Erlang 3-tuple containing the atom message, the message text, and the ID value. The message is then sent to the subscriber in question.

 XBuff rtn(true);
            XBuffEncoder e=rtn.GetEncoder();
            e.SetTupleHeader(3);
            e.SetAtom("message");
            e.SetString(m->msg.in());
            e.SetLong(m->id);
            Send(*subscriber, rtn);
        }
    }
    return true;
}

This completes the implementation of the gateway. After compiling all C++ code, as well as dds.erl, we can now run the publisher and subscriber tests. As with the CORBA tests, we will use Perl test runners, but these will be based on the test runners in the OpenDDS distribution. The output from the tests below has been abbreviated somewhat for ease of discussion — additional log messages will be displayed by the test runners.

To execute the publisher test, run DDS/Test/run_test_ep.pl. The test script performs the actions described below.

As the Erlang daemon, epmd, must be running in order for the gateway to register its listening socket, we ensure that an instance is started by running an Erlang node, and then executing the init_stop() function so it immediately terminates. Although the node has shut down, epmd remains running. The test runs a command similar to:

<ERL_ROOT>/bin/erl -sname dummy -setcookie secretcookie 
    -pa ../Erlang -noshell -run dds init_stop

Next, we start the DCPSInfoRepo:

<DDS_ROOT>\bin\.\DCPSInfoRepo.EXE -ORBDebugLevel 10 
    -ORBLogFile DCPSInfoRepo.log -o repo.ior

Next, we start the gateway as Erlang node alice:

..\CPP\Gateway\.\gateway.EXE -ORBSvcConf tcp.conf -sname alice 
    -setcookie secretcookie -port 8001

Next, we start a C++ DDS subscriber that subscribes to the topic that is being published. As the code is substantially similar to the subscriber-side of the gateway, it is not described here but is present in the code archive.

..\CPP\Subscriber\.\subscriber.EXE -ORBSvcConf tcp.conf

Finally, we start the Erlang publisher_test() function itself in Erlang node bob. We pass the address of the gateway (here, alice@oci1373) and the message to publish as arguments to publisher_test().

<ERL_ROOT>/bin/erl -sname bob -setcookie secretcookie 
    -pa ../Erlang -noshell 
    -run dds publisher_test alice@oci1373 Message

As the test runs, the Erlang node prints the message being sent, the gateway publishes them as DDS samples, and and the C++ subscriber prints the messages as they are received.

[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=10
[Subscriber] Received: msg='Message' id=10
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=9
[Subscriber] Received: msg='Message' id=9
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=8
[Subscriber] Received: msg='Message' id=8
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=7
[Subscriber] Received: msg='Message' id=7
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=6
[Subscriber] Received: msg='Message' id=6
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=5
[Subscriber] Received: msg='Message' id=5
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=4
[Subscriber] Received: msg='Message' id=4
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=3
[Subscriber] Received: msg='Message' id=3
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=2
[Subscriber] Received: msg='Message' id=2
[Erlang bob@oci1373] Sent to gateway: msg='"Message"' id=1
[Subscriber] Received: msg='Message' id=1
[Erlang bob@oci1373]: done

The subscriber test starts similarly. When the test script DDS/Test/run_test_es.pl is executed, an Erlang node is run, executing the init_stop() function, to ensure that epmd has started. Next, the DCPSInfoRepo is started, as well as the gateway on node alice.

<ERL_ROOT>/bin/erl -sname dummy -setcookie secretcookie 
    -pa ../Erlang -noshell -run dds init_stop
<DDS_ROOT>\bin\.\DCPSInfoRepo.EXE -ORBDebugLevel 10 
    -ORBLogFile DCPSInfoRepo.log -o repo.ior
..\CPP\Gateway\.\gateway.EXE -ORBSvcConf tcp.conf -sname alice 
    -setcookie secretcookie -port 8001

Erlang node bob is now started to run the subscriber_test() function, which takes the address of the gateway as an argument.

<ERL_ROOT>/bin/erl -sname bob -setcookie secretcookie 
    -pa ../Erlang -noshell -run dds subscriber_test alice@oci1373

Finally, we start a C++ DDS publisher that publishes Message samples. As the code is substantially similar to the publisher-side of the gateway, it is not described here but is present in the code archive.

..\CPP\Publisher\.\publisher.EXE -ORBSvcConf tcp.conf

As the test runs, the C++ publisher prints the message being sent, the gateway publishes them as DDS samples, and and the Erlang subscriber prints the messages as they are received.

[Publisher] Publish: msg='Message' id=10
[Erlang bob@oci1373] Received: msg='"Message"' id=10
[Publisher] Publish: msg='Message' id=9
[Erlang bob@oci1373] Received: msg='"Message"' id=9
[Publisher] Publish: msg='Message' id=8
[Erlang bob@oci1373] Received: msg='"Message"' id=8
[Publisher] Publish: msg='Message' id=7
[Erlang bob@oci1373] Received: msg='"Message"' id=7
[Publisher] Publish: msg='Message' id=6
[Erlang bob@oci1373] Received: msg='"Message"' id=6
[Publisher] Publish: msg='Message' id=5
[Erlang bob@oci1373] Received: msg='"Message"' id=5
[Publisher] Publish: msg='Message' id=4
[Erlang bob@oci1373] Received: msg='"Message"' id=4
[Publisher] Publish: msg='Message' id=3
[Erlang bob@oci1373] Received: msg='"Message"' id=3
[Publisher] Publish: msg='Message' id=2
[Erlang bob@oci1373] Received: msg='"Message"' id=2
[Publisher] Publish: msg='Message' id=1
[Erlang bob@oci1373] Received: msg='"Message"' id=1
[Publisher] Publish: msg='Message' id=0
[Erlang bob@oci1373]: done

CONCLUSION

As shown in this article, CORBA and DDS can be used with systems written in Erlang. While CORBA has direct support in the Erlang distribution, DDS must be used via the external code interface. By using the framework presented here, supporting different CORBA interfaces and DDS sample types is straightforward.

REFERENCES

[1] Open-source Erlang - White Paper
http://erlang.org/white_paper.html

[2] Where is Erlang used and why?
http://stackoverflow.com/questions/1636455/where-is-erlang-used-and-why

[3] The CouchDB Project
http://couchdb.apache.org/

[4] RabbitMQ
http://www.rabbitmq.com/

[5] Advanced Message Queuing Protocol
http://www.amqp.org/

[6] The ACE ORB (TAO)
http://www.theaceorb.com/

[7] OpenDDS
http://www.opendds.org/

[8] Armstrong. Programming Erlang, Software for a Concurrent World. Pragmatic Bookshelf, 2007.

[9] Cesarini, Thompson. Erlang Programming. O'Reilly, 2009.

[10] Logan, Merritt, Carlsson. Erlang and OTP in Action. Manning, 2011.

[11] www.trapexit.org
http://www.trapexit.org/

[12] Open Source Erlang
http://www.erlang.org/

[13] erldocs.com
http://erldocs.com/

[14] Installing Orber
http://www.erlang.org/doc/apps/orber/ch_install.html

[15] Orber Examples
http://erlang.mirror.su.se/documentation/doc-5.4.8/lib/orber-3.6/doc/html/ch_example.html

[16] Multi-Language CORBA Development with C++ (TAO), Java (JacORB), Perl (opalORB), and C# (IIOP.NET)
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-200904.html

[17] OMG IDL to Erlang Mapping
http://www.erlang.org/doc/apps/orber/ch_idl_to_erlang_mapping.html

[18] [Mnesia] Introduction
http://www.erlang.org/doc/apps/mnesia/Mnesia_chap1.html

[19] Orber options
http://www.erlang.org/doc/apps/orber/ch_install.html#config

[20] erl
http://www.erlang.org/doc/man/erl.html

[21] C Nodes
http://www.erlang.org/doc/tutorial/cnode.html

[22] OpenDDS Developer's Guide
http://download.ociweb.com/OpenDDS/OpenDDS-latest.pdf

[23] ei_connect
http://www.erlang.org/doc/man/ei_connect.html

[24] MPC
https://objectcomputing.com/products/mpc

secret