The CORBA Notification Service: 
JacORB and TAO Interoperability

by Heather Drury 
Product Services Manager and Senior Software Engineer 
Object Computing, Inc. (OCI)

This month's CORBA News Brief from OCI provides a high-level overview of the OMG Notification Service and illustrates using the TAO 1.2a Notification Service interoperating with a JacORB v1.4 consumer.

Overview of the Notification Service

The  Notification Service Specification was originally produced by the Telecommunications Working Group within the Object Management Group (OMG), an international consortium of over 800 companies, and was later adopted as a standard service.  The goal was to extend the more basic OMG Event Service specification to support telecommunication applications yet remain backward compatible with the standard Event Service. The Notification Service preserves all the semantics specified for the OMG Event Service, allowing for interoperability between basic Event Service and Notification Service clients.  

Both the Notification and Event services enable events (with an optional data payload) to be sent and received between objects in a decoupled fashion. This provides a more flexible mechanism for message transmission than if events were transmitted directly.  Analogous to the standard OMG Event Service, the Notification model utilizes an event channel as the substrate for the communication of messages between client applications. Applications that provide messages are termed suppliers while applications that receive (or consume) messages are known as consumers. Suppliers and consumers are completely decoupled from one another (i.e., suppliers have no knowledge of which consumers are listening for their messages and vice versa).  While the OMG Event Service supports asynchronous exchange of event messages, it has several serious limitations including no support for event filtering and no ability to be configured for varying levels of qualities of service (QoS). The Notification Service was designed to enhance the OMG Event Service by adding the following features:

Figure 1 shows the general architecture of the Notification Service. Suppliers and consumers of a notification channel are connected via their associated proxies. Administration interfaces on the supplier side and consumer side provide the ability to cluster a set of proxies based on common configurations.

Figure 1: Notification Service Architecture

TAO Notification Service

TAO 1.2a includes an implementation of the OMG Notification Service. Highlights of the 1.2a release include support for the following features:

Example using TAO and JacORB

We illustrate the utility and interoperability of the Notification Service by creating a simple supplier application that sends events through the TAO Notification Channel to a JacORB consumer. The TAO application (written in C++) is a push supplier that connects to the Notification Channel and pushes events.  These events are simple text messages that are comprised of a "sender", "subject" and "body".  The JacORB (written in Java) consumer is a push consumer that connects to the channel and asynchronously receives event notifications from the channel. Both the supplier and consumer use the structured event type. The example is composed of three applications as illustrated in Figure 2 and further described below:

Figure 2: Overview of TAO/JacORB example.

  1. TAO  MessengerClient: a simple C++ application that connects to the TAO Naming Service, looks up the MessengerServer object reference and invokes the send_message() method on that object to send two messages, one with "TAO" as the subject and the other with "JacORB" as the subject.  The MessengerClient application does not directly interact with the Notification service in any way. We will not describe the MessengerClient application in any further detail.
  2. TAO  MessengerServer: a C++ application that plays the role of the server for the MessengerClient and the supplier for the JacORB consumer. The Notification channel is created within this application. The implementation for the Messenger servant contains a send_message() method that is invoked from the MessengerClient and packages the incoming data (with the associated "subject", "sender", and "body") into a structured event and invokes the push_structured_event() method on the consumer proxy object. 
  3. JacORB Consumer: a Java application that uses JacORBv1.4 to connect to the TAO Naming Service, look up the Notification Event channel, and receive incoming events.  This application also utilizes the filtering functionality of the TAO Notification Service to limit incoming events to those that contain "JacORB" in their subject and to ignore all others.
  4. TAO Notification Service: service use to set up notification channel and associated administrative objects.
  5. TAO Naming Service: naming service used to locate CORBA objects.

TAO C++ Push Supplier Application

The MessengerServer main() method initializes the ORB (TAO), creates the Messenger servant, and registers the Messenger object with the TAO Naming Service. The code for the MessengerServer.cpp class is shown below:

#include <orbsvcs/CosNamingC.h>
#include "Messenger_i.h"
#include <ace/streams.h>

int
main(int argc, char * argv[])
{
  try
  {
    // Initialize orb
    CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
    
    // Find the Naming Service.       
    CORBA::Object_var rootObj = 
      orb->resolve_initial_references("NameService");
    CosNaming::NamingContext_var rootNC =
      CosNaming::NamingContext::_narrow(rootObj.in());
    
    // Get the  Root POA.
    CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
    PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
    
    // Activate POA manager
    PortableServer::POAManager_var mgr = poa->the_POAManager();
    mgr->activate();
    
    // Create our Messenger servant.
    
		 messenger_servant(orb.in());
    
    // Register it with the RootPOA.
    PortableServer::ObjectId_var oid = 
      poa->activate_object( &messenger_servant );
    CORBA::Object_var messenger_obj = poa->id_to_reference( oid.in() );
    
    // Bind it in the Naming Service.
    CosNaming::Name name;
    name.length (1);
    name[0].id = CORBA::string_dup("MessengerService");
    rootNC->rebind(name, messenger_obj.in());
    
    // Accept requests
    orb->run(); 
    orb->destroy();
    
  }
  catch (CORBA::Exception& ex) {
    cerr << ex << endl;
    return 1;
  }
  return 0;
  
}

We now implement the structured push supplier that connects to the Notification Service and sends structured events when its send_message() method is invoked.  The steps involved in connecting the supplier to the event channel are illustrated in steps 1-6 in Figure 3 and further described below. The corresponding lines of code are also identified below within the MessengerServer example.


Figure 3: TAO C++ Messenger Server Supplier

 Obtain an object reference to the event channel factory.

 Obtain an event channel.

 Obtain the SupplierAdmin object reference.

 Obtain a structured push consumer proxy object.

 Receive an incoming message from MessengerClient application.

 Connect to the proxy to push the structured event.

The Messenger_i class implements the Messenger interface (containing the send_message() method) and also acts as an event supplier to the TAO Notification Channel. The constructor for the Messenger servant initializes the ORB and uses the TAO Naming Service ("NameService") to look up the TAO Notification Service channel factory. The Notification Channel is then bound to the root naming context of the TAO Naming Service under the name "MyEventChannel." 

Messenger_i::Messenger_i (CORBA::ORB_ptr orb)
: orb_ (CORBA::ORB::_duplicate(orb))

{
  try
  {
    CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
    PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
    
    CORBA::Object_var naming_obj =
      orb_->resolve_initial_references ("NameService");
    
    if (CORBA::is_nil(naming_obj.in())) {
      cerr << "Unable to find naming service" << endl;
    } 
    
    CosNaming::NamingContext_var naming_context =
      CosNaming::NamingContext::_narrow(naming_obj.in());
    
    //  Create an instance of TAO's notification event channel    
    CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
      TAO_Notify_EventChannelFactory_i::create(poa.in());
        
    CosNotifyChannelAdmin::ChannelID id;
    CosNotification::QoSProperties initial_qos;
    CosNotification::AdminProperties initial_admin;
    
    CosNotifyChannelAdmin::EventChannel_var ec =
      notify_factory->create_channel (initial_qos,
      initial_admin,
      id);
    
    if (CORBA::is_nil (ec.in())) {
      cerr << "Unable to crete event channel" << endl;
    }    
    
    CosNaming::Name name(1);
    name.length(1);
    name[0].id = CORBA::string_dup("MyEventChannel");
    
    naming_context->rebind(name, ec.in());
    
    CosNotifyChannelAdmin::AdminID adminid; 
    CosNotifyChannelAdmin::InterFilterGroupOperator ifgop = 
      CosNotifyChannelAdmin::AND_OP;  
    
    CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
      ec->new_for_suppliers (ifgop, adminid);
    
    if (CORBA::is_nil (supplier_admin.in())) {
      cerr << "Unable to find supplier admin" << endl;
    }    
    
    CosNotifyChannelAdmin::ProxyID supplieradmin_proxy_id;
    
    CosNotifyChannelAdmin::ProxyConsumer_var proxy_consumer =
      supplier_admin->obtain_notification_push_consumer(
      CosNotifyChannelAdmin::STRUCTURED_EVENT,
      supplieradmin_proxy_id);
    
    StructuredEventSupplier_i *servant = 
      new StructuredEventSupplier_i(orb_.in()); 
    
    PortableServer::ObjectId_var oid = poa->activate_object(servant);
    CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
    CosNotifyComm::StructuredPushSupplier_var supplier = 
      CosNotifyComm::StructuredPushSupplier::_narrow(supplier_obj.in());
    
    consumer_proxy_ = 
      CosNotifyChannelAdmin::StructuredProxyPushConsumer::
      _narrow(proxy_consumer.in());
    
    if (CORBA::is_nil (consumer_proxy_.in())) {
      cerr << "Unable to find structured proxy push consumer" << endl;
    }   
    consumer_proxy_->connect_structured_push_supplier(supplier.in());
    
  }
  catch (CORBA::Exception &ex) {   
    cerr << ex << endl;
  }  
}

The send_message() method of the Messenger Servant formats the message and then creates a new structured event and populates it with the contents of the message.  The push_structured_event() operation of the structured push consumer proxy object is used to push the event to the notification channel.

CORBA::Boolean Messenger_i::send_message (const char * user_name,
                                          const char * subject,
                                          char *& message)
                                          throw (CORBA::SystemException)
{
  
  cout << "Message from: " << user_name << endl;
  cout << "Subject:      " << subject << endl;
  cout << "Message:      " << message << endl;
  
  try
  {
    
    // Event Definition
    CosNotification::StructuredEvent event; 
    
    event.header.fixed_header.event_type.domain_name = 
      CORBA::string_dup("OCI");
    // string
    event.header.fixed_header.event_type.type_name = 
      CORBA::string_dup("examples");
    // string
    event.header.fixed_header.event_name = 
      CORBA::string_dup("myevent");
    
    // sequence<Property>: string name, any value
    event.filterable_data.length (1);
    event.filterable_data[0].name = CORBA::string_dup("From");
    event.filterable_data[0].value <<= (const char *)user_name;
    event.filterable_data.length (2);
    event.filterable_data[1].name = CORBA::string_dup("Subject");
    event.filterable_data[1].value <<= (const char *)subject;
    event.filterable_data.length (3);
    event.filterable_data[2].name = CORBA::string_dup("Message");
    event.filterable_data[2].value <<= (const char *)message;    
    
    consumer_proxy_->push_structured_event(event);
  }
  
  catch (CosNotifyComm::InvalidEventType &) {
    cerr << "Invalid Event Type Exception " << endl;
    return 1;
  }
  
  catch (CORBA::Exception &ex) {   
    cerr << ex << endl;
    return 1;
  }
  return 0;
}	

Lastly, the supplier class StructuredEventSupplier_i, which implements the CosNotifyComm:StructuredPushSupplier IDL interface, must be implemented. It contains two operations:

The constructor for this class simply duplicates the ORB reference.

StructuredEventSupplier_i::StructuredEventSupplier_i(CORBA::ORB_ptr orb)
: orb_(CORBA::ORB::_duplicate(orb))
{
}

The implementation for the subscription_change() method does nothing for the current implementation. The implementation for the disconnect_structured_push_supplier() deactivates the supplier object.

void StructuredEventSupplier_i::disconnect_structured_push_supplier ()
	throw (CORBA::SystemException)
{
  
  CORBA::Object_var obj = orb_->resolve_initial_references ("POACurrent");
  PortableServer::Current_var current = 
    PortableServer::Current::_narrow (obj.in());
  PortableServer::POA_var poa = current->get_POA ();
  PortableServer::ObjectId_var objectId = current->get_object_id ();
  poa->deactivate_object (objectId.in());
  
}

JacORB Java Push Consumer Application

We now implement the JacORB consumer which implements a structured push consumer that connects to the TAO Notification Channel and prints all the structured events it receives. The consumer uses the filtering functionality of the Notification Service to set up a constraint to allow only messages whose subjects contain "JacORB" to be received.

Figure 4: JacORB Consumer

  Obtain an object reference to the event channel.

  Obtain the ConsumerAdmin object reference.

  Use the default filter factory to create a new filter; add the filter to the ConsumerAdmin.

  Obtain a structured push supplier proxy object.

  Connect to the proxy.

The JacORB consumer is implemented in the Java class JacORBConsumer shown below:

package JacORBConsumer;

import org.omg.CosNotification.*;
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin.*;
import org.omg.CosNotifyFilter.*;
import org.omg.CosNaming.*;
import org.omg.CosNaming.NamingContextPackage.*;
import org.omg.PortableServer.*;

import java.io.*;
import java.util.*;
import java.net.*;

public class Consumer extends StructuredPushConsumerPOA
{
  public static void main( String[] args )
  {
    try {
      
      System.out.println ("Initialize orb\n");
      org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
      
     
      // get naming service reference and context
      BufferedReader reader = new BufferedReader(new FileReader("ns.ior"));
      String ns = reader.readLine();
      org.omg.CORBA.Object obj = orb.string_to_object(ns);
      
      NamingContext rootContext = NamingContextHelper.narrow( obj );
      System.out.println("Got Root context: "+ obj);
     
      // find the event channel reference
      NameComponent[] name = { new NameComponent( "MyEventChannel", "" )};
      obj = rootContext.resolve(name);
      EventChannel channel = EventChannelHelper.narrow(obj);
      System.out.println ("**************MyChannel is " + channel.toString ());
      
      // get the admin interface and the supplier proxy
      InterFilterGroupOperator ifgop = InterFilterGroupOperator.AND_OP;
      org.omg.CORBA.IntHolder adminId = new org.omg.CORBA.IntHolder(0);
      ConsumerAdmin consumerAdmin = channel.new_for_consumers(ifgop, adminId);

      // get the default filter factory
      FilterFactory filterFactory = channel.default_filter_factory();
      Filter filter = null;
      if ( filterFactory == null ){
      	System.err.println ("No default filter factory found!");
      }
      try {
      	filter = filterFactory.create_filter("TCL");
      }
      catch (Exception e){
        System.err.println( "ERROR: " + e );
        e.printStackTrace( System.err );
      }

      String expr = "Subject == 'JacORB'";

      ConstraintExp exp[] = new ConstraintExp[1];
      EventType eventType[] = new EventType [0];
      exp[0] = new ConstraintExp (eventType, expr);
      try {
        ConstraintInfo info[] = filter.add_constraints (exp);
        consumerAdmin.add_filter(filter);
      }
      catch (InvalidConstraint ex) {
        System.err.println( "ERROR: " + e );
        e.printStackTrace( System.err );
      }

      EventType added[] = new EventType[1];
      EventType removed[] = new EventType [0];      
      added[0] = new EventType ("*", "*");      
      try{
        consumerAdmin.subscription_change(added, removed);
      }
      catch (Exception e){
        System.err.println( "ERROR: " + e );
        e.printStackTrace( System.err );
      }

      POA poa = org.omg.PortableServer.POAHelper.narrow 
      		(orb.resolve_initial_references("RootPOA"));

      // create and implicitly activate the client
      StructuredPushConsumer structuredPushConsumer = 
      		(StructuredPushConsumer)new Consumer()._this(orb);
      		   
      // get the stuctured proxy push supplier
      ClientType clientType = ClientType.STRUCTURED_EVENT;
      org.omg.CORBA.IntHolder proxyId = new org.omg.CORBA.IntHolder (0);
      ProxySupplier proxySupplier = null;
      try{
      	proxySupplier = consumerAdmin.obtain_notification_push_supplier (clientType, proxyId);
      }
      catch( AdminLimitExceeded e ){
		System.err.println ( "ERROR: " + e);
		e.printStackTrace( System.err );
      }
      StructuredProxyPushSupplier proxyPushSupplier = 
      		StructuredProxyPushSupplierHelper.narrow(proxySupplier);
      		
      // connect ourselves to the event channel
      proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);   
   	  
      poa.the_POAManager().activate();
      System.out.println ("run the orb");
      orb.run();
    } 
    // Catch exceptions
    catch ( Exception e ) {
       System.err.println( "ERROR: " + e );
       e.printStackTrace( System.err );
    }
    System.out.println("Normal Termination...");
  } 
  
  public void disconnect_structured_push_consumer (){
  	System.out.println ("disconnect_structured_push_consumer invoked");
  }
  
  public void offer_change (EventType added[], EventType removed[]){
  	System.out.println ("offer_change invoked");
  }
  
  public void push_structured_event (StructuredEvent event){
  	try {
  	  System.out.println ("\nevent name is: " + event.header.fixed_header.event_name);
  	  System.out.println ("domain name is: " + event.header.fixed_header.event_type.domain_name);
  	  System.out.println ("type name is: " + event.header.fixed_header.event_type.type_name);
  	  for (int i = 0; i < event.filterable_data.length; i++){
  	  	System.out.println (event.filterable_data[i].name + ":\t" + event.filterable_data[i].value);
  	  }
  	}
    catch ( Exception e ) {
       System.err.println( "ERROR: " + e );
       e.printStackTrace( System.err );
    }
  }
}

The consumer locates the TAO Notification service and invokes the new_for_consumers() method on the channel interface to get the ConsumerAdmin object reference. The consumer uses the default_filter_factory() to create a filter that utilizes the TCL constraint language. A constraint is created ("Subject == 'JacORB'") and added to the filter using the add_constraints() method on the filter. The filter is then added to the ConsumerAdmin object using the add_filter() method.  Invoking the obtain_notification_push_supplier() operation on the ConsumerAdmin object obtains the supplier push proxy.  Finally, the proxy is connected and the consumer waits for events to be pushed.  Events are printed in the push_structured_event() method.  Invoking the disconnect_structured_push_consumer() operation, by the channel, will disconnect the consumer at any point in time (care should be taken in the disconnection of suppliers and consumers).  The offered_change() operation does nothing in this example, but an intelligent consumer may, for example, disconnect when the channel no longer offers the types of events that are of interest.

Summary

This article focused on illustrating how to use the TAO Notification Service in conjunction with a JacORB consumer.  The full source code of the example illustrated in this article can be found here.  There are many resources available that provide greater detail into the OMG Notification Service, TAO, and JacORB. These include the OMG Notification Service specification and Java Programming with CORBA by Brose, Vogel, and Duddy.  In depth information on CORBA programming with TAO can be found from the the recently released TAO 1.2a Developer's Guide. Other related resources can be found by visiting the links listed in the References section.

References

  1. TAO at Washington University, St. Louis - http://tao.doc.wustl.edu 
  2. OCI TAO: The ACE ORB - http://www.theaceorb.com
  3. OCI TAO Resources - http://www.theaceorb.com/references/
  4. JacORB - http://www.jacorb.org
  5. OMG Notification Service Specification - http://www.omg.org/cgi-bin/doc?formal/2000-06-20

OCI Educational Services

Object Computing, Inc (OCI) has been providing educational services to clients, industries and universities since 1993. We offer one of the most comprehensive distributed Object Oriented training curricula in the country. These curricula focus on the fundamentals of OO technology; with close to 40 workshops in OOAD, Java, XML, C++/CORBA and Unix/Linux.

Java C/C++ .NET/C# Real-Time Systems Object Oriented Software Engineering Distributed Computing Wireless Enterprise Unix/Linux XML

For further information regarding OCI's Educational Services programs, please visit our Educational Services section on the web or contact us at training.


The OCI CORBA News Brief is intended to promote CORBA and object technology in the development of distributed computing applications. Each issue of the CORBA News Brief will feature news and technical information about OCI's supported open-source ORBs (TAO and JacORB), case studies, and examples using CORBA, as well as information about OCI's educational offerings.

The OCI CORBA News Brief is published on a monthly basis. Send ideas for articles of interest to corba.

To subscribe or unsubscribe from the CNB mailing list, send mail to majordomo with the line "subscribe cnb" or "unsubscribe cnb" in the body of the message.