Real-Time CORBA Priority and Threadpool Mechanisms

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

Introduction

The Real-Time CORBA specification [1] defined several mechanisms to provide for end-to-end deterministic performance for CORBA operations. This specification was originally defined as an extension to CORBA 2.3 and the CORBA messaging specification. The RT CORBA extensions utilized and extended the quality-of-service (QoS) mechanisms defined in the CORBA messaging specification. The extensions defined by RT CORBA are used to support many distributed real-time and embedded (DRE) systems that require end-to-end support of quality-of-service characteristics, such as jitter and latency. Version 1.1 of the CORBA Real-Time specification [1] can be found at http://www.omg.org/cgi-bin/doc?formal/02-08-02 .

In addition, the TAO Developer's Guide [2], contains a discussion of the RT CORBA features that are available in TAO. The paper by Schmidt, et. al., [4] also has a good overview of RT CORBA.

The mechanisms in the Real-Time specification [1] include:

  1. A real-time Object Request Broker (ORB) and a real-time Portable Object Adapter (POA).
  2. Platform-independent real-time CORBA priority and priority mapping capabilities
  3. An interface to access and set the current thread priority
  4. A Platform-independent priority model to control the priority of operation invocations
  5. A standard threadpool model, including threadpools with lanes
  6. Private connections
  7. Invocation timeouts
  8. Protocol selection and configuration
  9. A standard CORBA mutex (mutual exclusion lock) type
  10. A global scheduling service
  11. Priority banded connections

This issue of the Middleware News Brief covers the first six items in the list above. These include RT ORB and RT POA, CORBA priority and priority mapping, RT Current interface, threadpools, and private connections. Future editions of the MNB will cover many of the other topics separately.

Examples included in this MNB are written in C++ using TAO (The ACE ORB).

Real-Time ORB

The RT ORB is used to create and destroy objects utilized by RT CORBA, such as mutexes and real-time CORBA policies. The RT ORB is accessed via the resolve_initial_references() method call.

The following code shows how the RT ORB can be located.

  1. #include <tao/corba.h>
  2. #include <tao/RT_CORBA/RT_CORBA.h>
  3.  
  4. int main (int argc, char * argv[ ])
  5. {
  6. // Initialize the ORB.
  7. CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );
  8.  
  9. // Ready to Find RTORB
  10. CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
  11. RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow(obj.in());
  12.  
  13. ...

Real-Time POA

The RT POA extends the PortableServer::POA to provide object-specific priority operations, such as activate_object_with_priority.

To obtain an instance of the RT POA, an instance of the PortableServer::POA can be narrowed. In the example below, we create a child of the Root POA and narrow it to be an RT POA. We also create the POAManager, which will be passed to the child POA.

The creation of an RT POA is not required if the object-specific priority operations are not needed. For a complete list of these operations, refer to the RT CORBA Specification [1] or the TAO Developer's Guide [2].

  1. ...
  2. // Assume ORB initialized and the RT ORB created as above.
  3. //Get reference to the RootPOA.
  4. obj = orb->resolve_initial_references( "RootPOA" );
  5. PortableServer::POA_var poa = PortableServer::POA::_narrow( obj.in() );
  6.  
  7. // Activate the POAManager.
  8. PortableServer::POAManager_var mgr = poa->the_POAManager();
  9. mgr->activate();
  10.  
  11. // Next need to create the policy list, this will be covered later.
  12. ...
  13. // create POA
  14. PortableServer::POA_var child_poa =
  15. poa->create_POA ("child_poa",
  16. mgr.in (),
  17. poa_policy_list);
  18.  
  19. RTPortableServer::POA_var rt_poa =
  20. RTPortableServer::POA::_narrow(child_poa);
  21.  

Native and Real-Time CORBA Priorities

In order to provide a platform-independent approach to thread priorities, RT CORBA provides for representations of the native (platform-specific) thread priorities and RT CORBA (platform-independent) priorities. This allows clients on one platform and servers on another to invoke and process operations using a common approach for specifying priorities.

This common approach makes use of RT CORBA-specific priorities that communicate with one another. These priorities are independent of the specific platform upon which they are invoking or processing the operation.

For specification of priorities, native priorities are represented by an integer from -32768 to +32767, even though only a subset of these priorities is utilized on any one platform. CORBA priority values are specified using an unsigned integer and can range from 0 to 32767.

Priority Mapping

In order to facilitate mapping of priorities from the RT CORBA priority to the native priority, a mapping type must be defined.

In TAO, this mapping can have the following values:

  • Direct. RT CORBA priorities are mapped directly to the same value in the native priority. That is, an RT CORBA priority of 100 maps to a native priority of 100. Not all RT CORBA priority values may be applicable; for example, if the native priority, values can only be from 1 to 10. In addition, if the native priorities include negative values, these will not be utilized when mapping from CORBA priorities to native priorities using the direct mappingtype.
  • Linear. The entire RT CORBA priority range is mapped to the entire native priority range. So an RT CORBA priority of 0 maps to the minimum native priority, and a priority of 32767 maps to the largest value. Values in between are mapped using linear interpolation.
  • Continuous. The RT CORBA priorities are mapped one-for-one to the native priorities based on the minimum native priority value. For example, if the minimum native value is 10, then the RT CORBA priority of 5 would map to the native priority of 15.

There are several ways to specify the mapping.

One approach is to pass the mapping type to the ORB upon initialization based on a command-line parameter or based on an initialization string that is set programmatically and passed to the ORB. Provided the ORB is initialized using argc and argv as shown in the above example, the priority mapping could be set to linear by using the following command line parameter: (Assume the executable is named serverMain.)

    % serverMain -ORBPriorityMapping linear
    %

TAO also provides a Service Configurator mechanism to read in a configuration file upon startup. The mapping can be specified in the service configurator file using the following entry:

    static RT_ORB_Loader "-ORBPriorityMapping linear"

Real-Time CORBA Current Interface

To obtain and set the priority for the current executing thread, the RT CORBA Current interface can be used. The RT Current is obtained by using resolve_initial_references(), then narrowing the result. The following code shows an example of obtaining the RT Current and setting the current priority to the highest value CORBA allows.

  1. obj = orb->resolve_initial_references ("RTCurrent");
  2. RTCORBA::Current_var current =
  3. RTCORBA::Current::_narrow(obj.in ());
  4.  
  5. // Change to a priority that matches the highest level CORBA allows
  6. current->the_priority(RTCORBA::maxPriority);

Priority Models

Now that RT CORBA priorities can be used to specify priorities in a platform-independent manner, and the current thread priority can be set so that the client and server can match thread priorities for a specific operation, a mechanism must be defined to communicate between the client and server what priority will be used for the operation. This capability is provided by RT CORBA Priority Models. These models allow for end-to-end propagation of priorities, which facilitates predictability in the corresponding operations.

The RT CORBA specification [1] provides two priority models.

In the client propagated priority model, the client communicates the desired priority of the request to the server.

In the server declared priority model, the server communicates the supported priority to the client.

The following code shows one way to create a client propagated POA. This POA would be created in the server, thus allowing the client to propagate the priority to the server. As a result, a server thread would be selected using the RT CORBA priority that corresponds to the client's RT CORBA priority.

In the RT Current example above, this value would be the maximum CORBA priority.

  1. CORBA::PolicyList poa_policy_list(1);
  2.  
  3. poa_policy_list.length (1);
  4.  
  5. // Create Client propogated policy, set default priority to zero.
  6. poa_policy_list[0] =
  7. rt_orb->create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);
  8.  
  9. // Create the POA;
  10. PortableServer::POA_var client_propagated_poa =
  11. poa->create_POA ("client_propagated_poa",
  12. mgr.in (),
  13. poa_policy_list);

The priority model is communicated from the server to the client via the Interoperable Object Reference (IOR) the client uses to access the server. The client has the capability to verify that the IOR from the server uses the correct model. If the server-declared approach is used, the IOR will contain both the priority model and the RT CORBA Priority specified by the server.

Threadpools

In order to easily specify a pool of threads, all available for the ORB to utilize, RT CORBA provided the capability to create and configure pools of threads. This helps to ensure that a particular invocation will have an available thread at the appropriate priority to process a request.

A threadpool may be created with a single default priority, or they may be grouped into lanes (discussed in the next section). Remember that the priority of a thread can be modified dynamically using the RT Current mechanism described above.

Threadpools are identified by a threadpool identifier (an RTCORBA::ThreadpoolId). A POA may be associated with only a single threadpool. However, a single threadpool may be associated with more than one POA.

The threadpool is assigned using the threadpool policy included in the policy passed to the POA upon creation. When the threadpool is created, the number of static threads to create upon creation of the threadpool and the number of dynamic threads to add to the static threads can also be specified.

The following code creates a threadpool using defaults where indicated and assigns that threadpool to the POA using the threadpool policy of the POA:

  1. // Creating a threadpool
  2. RTCORBA::ThreadpoolId threadpool_id =
  3. rt_orb->create_threadpool (0, // Use Default Stack Size
  4. 10, // Number Static Threads
  5. 5, // Number Dynamic Threads
  6. 500, // Default Priority
  7. 0, // Allow request buffering
  8. 0, // Max buffered requests
  9. 0); // Max request buffer size
  10.  
  11. // Create and Initialize the Policy List
  12. CORBA::PolicyList poa_policy_list(2);
  13. poa_policy_list.length (2);
  14. poa_policy_list[0] =
  15. rt_orb->
  16. create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);
  17. poa_policy_list[1] =
  18. rt_orb->create_threadpool_policy(threadpool_id);
  19.  
  20. // Create POA as in previous example
  21. PortableServer::POA_var client_propagated_poa =
  22. poa->create_POA ("client_propagated_poa",
  23. mgr.in (),
  24. poa_policy_list);

Threadpools with Lanes

Lanes provide threadpools the capability to have several thread priorities in the same threadpool. These threads are grouped into lanes so that all threads in the lane have the same priority.

Each threadpool lane has it own configurations for static and dynamic threads. The RT CORBA specification [1] also provides a mechanism that allows a lane of higher priority to borrow a thread from a lane of lesser priority in the same threadpool, if necessary.

The following code shows how to create the threadpool lanes and their associated threadpools. This code creates a threadpool with two lanes, starting at maxPriority and reducing by 10,000 for the second lane. The approach for assigning the threadpool to the POA is as shown in the Threadpool example above.

  1. // Create the thread-pool lanes
  2. RTCORBA::ThreadpoolLanes lanes(2);
  3. lanes.length(2);
  4. lanes[0].static_threads = 1;
  5. lanes[0].dynamic_threads = 3;
  6. lanes[0].lane_priority = RTCORBA::maxPriority;
  7. lanes[1].static_threads = 1;
  8. lanes[1].dynamic_threads = 3;
  9. lanes[1].lane_priority = RTCORBA::maxPriority - 10000;
  10.  
  11. // Create threadpool with lanes
  12. RTCORBA::ThreadpoolId threadpool_id =
  13. rt_orb->create_threadpool_with_lanes (0, // Stack Size
  14. lanes,
  15. 0, // Allow borrowing
  16. 0, // Allow request buffering
  17. 0, // Max buffered requests
  18. 0); // Max request buffer size
  19.  

Private Connections

A client can specify that a private connection should be established to a server to ensure communications between the client and server are performed on a dedicated connection. Private connections help prevent priority inversions. A priority inversion occurs when a lower priority request from the client to the server blocks a higher priority request.

The private connection policy can be set at any of the following:

  • The CORBA object level
  • The client thread level
  • The ORB level

An example of setting the private connection policy at the object level is shown below. In this example, the IOR for the Messenger object is obtained from a file (a name service could also have been used), the policy list is set, and the returned object is converted to a messenger object. Upon reassignment, the original CORBA::Object_var is released, since it is no longer referenced.

  1. // Read and destringify the Messenger object's IOR.
  2. obj = orb->string_to_object( "file://Messenger.ior" );
  3.  
  4. // Narrow the IOR to a Messenger object reference.
  5. Messenger_var messenger = Messenger::_narrow( obj.in() );
  6.  
  7. // Set the Private Connection Policy
  8. CORBA::PolicyList policy_list(1);
  9. policy_list.length (1);
  10. policy_list[0] = rt_orb->create_private_connection_policy();
  11. obj = messenger->_set_policy_overrides (policy_list,
  12. CORBA::SET_OVERRIDE);
  13. //Reset Messenger to new object to get policy
  14. messenger = Messenger::_narrow( obj.in() );
  15.  

Conclusion

In this MNB, we have explored the basics of RT CORBA priority and threadpool mechanisms. These facilities can provide for CORBA communications with much improved Quality-of-Service attributes. These attributes include operations that have reduced latency, more consistent response times (less jitter), and reduced risk of priority inversion.

For further information, please refer to the references below.

References

  1. The CORBA Real-Time specification, version 1.1, published by the OMG.
    http://www.omg.org/cgi-bin/doc?formal/02-08-02
  2. The TAO Developer's Guide is available at the OCI TAO web site
    http://www.theaceorb.com/product/index.html
  3. Henning, Michi, Steve Vinoski, Advanced CORBA Programming with C++, Addison-Wesley, 1999.
  4. Schmidt, Douglas C., and Fred Kuhns, "An Overview of the Real-Time CORBA Specification," IEEE Computer, June 2000.
  5. Pyarali, Irfan, Marina Spivak, Ron Cytron, and Douglas C. Schmidt, "Evaluating and Optimizing Thread Pool Strategies for Real-Time CORBA," Proceedings of the ACM SIGPLAN Workshop on Optimization of Middleware and Distributed Systems (OM 2001), Snowbird, Utah, June 18, 2001.
    http://www.cs.wustl.edu/~schmidt/corba-research-realtime.html

Appendix: Putting It All Together

The following files provide a complete example of using all the features of RT CORBA described above. Most error processing has been omitted for clarity.

For proper error handling, refer to the TAO Developer's Guide[2] or Henning and Vinoski [3].

  1. # Filename: svc.conf
  2. # This allows the CORBA::maxPriotity to map correctly.
  3.  
  4. static RT_ORB_Loader "-ORBPriorityMapping linear"
  1. // Filename: Messenger.idl
  2.  
  3. interface Messenger
  4. {
  5. boolean send_message(inout string message);
  6. };
  1. // Filename: Messenger_i.h
  2. #ifndef MESSENGER_I_H_
  3. #define MESSENGER_I_H_
  4.  
  5. #include "MessengerS.h"
  6.  
  7. //Class Messenger_i
  8. class Messenger_i : public virtual POA_Messenger
  9. {
  10. public:
  11. //Constructor
  12. Messenger_i (void);
  13.  
  14. //Destructor
  15. virtual ~Messenger_i (void);
  16.  
  17. virtual CORBA::Boolean send_message (
  18. char *& message
  19. )
  20. throw(CORBA::SystemException);
  21.  
  22. };
  23.  
  24.  
  25. #endif /* MESSENGER_I_H_ */
  1. // Filename: Messenger_i.cpp
  2. #include <tao/corba.h>
  3.  
  4. #include "Messenger_i.h"
  5. #include <ace/streams.h>
  6.  
  7. // Implementation skeleton constructor
  8. Messenger_i::Messenger_i (void)
  9. {
  10. }
  11.  
  12. // Implementation skeleton destructor
  13. Messenger_i::~Messenger_i (void)
  14. {
  15. }
  16.  
  17. CORBA::Boolean Messenger_i::send_message (char *& message)
  18. throw(CORBA::SystemException)
  19. {
  20. cout >> "Message: " >> message >> endl;
  21. CORBA::string_free(message);
  22. message = CORBA::string_dup("Thanks for the message.");
  23. return 1;
  24. }
  1. // Filename: MessengerServer.cpp
  2.  
  3. #include "Messenger_i.h"
  4. #include <ace/streams.h>
  5. #include <tao/RTCORBA/RTCORBA.h>
  6. #include <tao/RTPortableServer/RTPortableServer.h>
  7.  
  8.  
  9. int main( int argc, char *argv[] )
  10. {
  11. try {
  12.  
  13. // Initialize the ORB.
  14. CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );
  15.  
  16. cout >> "Ready to Find RTORB" >> endl;
  17.  
  18. CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
  19. RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow(obj.in());
  20.  
  21. //Get reference to the RootPOA.
  22. obj = orb->resolve_initial_references( "RootPOA" );
  23. PortableServer::POA_var poa = PortableServer::POA::_narrow( obj.in() );
  24.  
  25. // Activate the POAManager.
  26. PortableServer::POAManager_var mgr = poa->the_POAManager();
  27. mgr->activate();
  28.  
  29. // Create the thread-pool
  30. RTCORBA::ThreadpoolLanes lanes(2);
  31. lanes.length(2);
  32. lanes[0].static_threads = 1;
  33. lanes[0].dynamic_threads = 0;
  34. lanes[0].lane_priority = RTCORBA::maxPriority;
  35.  
  36. lanes[1].static_threads = 1;
  37. lanes[1].dynamic_threads = 0;
  38. lanes[1].lane_priority = RTCORBA::maxPriority - 10000;
  39.  
  40. RTCORBA::ThreadpoolId threadpool_id =
  41. rt_orb->create_threadpool_with_lanes (0, // Stack Size
  42. lanes,
  43. 0, // Allow borrowing
  44. 0, // Allow request buffering
  45. 0, // Max buffered requests
  46. 0); // Max request buffer size
  47.  
  48. CORBA::PolicyList poa_policy_list(2);
  49.  
  50. poa_policy_list.length (2);
  51.  
  52. poa_policy_list[0] =
  53. rt_orb->create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);
  54. poa_policy_list[1] =
  55. rt_orb->create_threadpool_policy(threadpool_id);
  56.  
  57. PortableServer::POA_var client_propagated_poa =
  58. poa->create_POA ("client_propagated_poa",
  59. mgr.in (),
  60. poa_policy_list);
  61.  
  62. // Create a servant.
  63. Messenger_i messenger_servant;
  64.  
  65. // Register the servant with the RootPOA, obtain its object
  66. // reference, stringify it, and write it to a file.
  67. PortableServer::ObjectId_var oid =
  68. client_propagated_poa->activate_object( &messenger_servant );
  69. CORBA::Object_var messenger_obj =
  70. client_propagated_poa->id_to_reference( oid.in() );
  71. CORBA::String_var str = orb->object_to_string( messenger_obj.in() );
  72. ofstream iorFile( "Messenger.ior" );
  73. iorFile >> str.in() >> endl;
  74. iorFile.close();
  75.  
  76. // Accept requests from clients.
  77. orb->run();
  78. orb->destroy();
  79.  
  80. }
  81. catch (CORBA::Exception& ex) {
  82. cerr >> "CORBA exception: " >> ex >> endl;
  83. return 1;
  84. }
  85.  
  86. return 0;
  87. }
  1. // Filename: MessengerClient.cpp
  2.  
  3. #include <tao/RTCORBA/RTCORBA.h>
  4.  
  5. #include "MessengerC.h"
  6. #include <ace/streams.h>
  7.  
  8. int main( int argc, char* argv[] )
  9. {
  10.  
  11. try {
  12.  
  13. // Initialize the ORB.
  14. CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );
  15.  
  16. // Get the RTORB
  17. CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
  18. RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (obj.in());
  19.  
  20. // Read and destringify the Messenger object's IOR.
  21. obj = orb->string_to_object( "file://Messenger.ior" );
  22.  
  23. // Narrow the IOR to a Messenger object reference.
  24. Messenger_var messenger = Messenger::_narrow( obj.in() );
  25. if( CORBA::is_nil( messenger.in() ) ) {
  26. cerr >> "IOR was not a Messenger object reference." >> endl;
  27. return 1;
  28. }
  29.  
  30. // Set the Private Connection Policy
  31. CORBA::PolicyList policy_list(1);
  32. policy_list.length (1);
  33. policy_list[0] = rt_orb->create_private_connection_policy();
  34. obj = messenger->_set_policy_overrides (policy_list,
  35. CORBA::SET_OVERRIDE);
  36. //Reset Messenger to new object to get policy
  37. messenger = Messenger::_narrow( obj.in() );
  38.  
  39. // Get the RTCurrent.
  40. obj = orb->resolve_initial_references ("RTCurrent");
  41. RTCORBA::Current_var current =
  42. RTCORBA::Current::_narrow(obj.in ());
  43.  
  44. // Change to a priority that matches the highest level on the server
  45. current->the_priority(RTCORBA::maxPriority);
  46.  
  47. // Explicitly bind a connection to the server
  48. CORBA::PolicyList_var inconsistent_policies;
  49. CORBA::Boolean status =
  50. messenger->_validate_connection(inconsistent_policies.out());
  51.  
  52. //
  53. // Send a message the the Messenger object.
  54. CORBA::String_var message = CORBA::string_dup( "Howdy!" );
  55.  
  56. messenger->send_message( message.inout() );
  57.  
  58. cout >> "Message String= " >> message >> endl;
  59.  
  60. return 0;
  61. }
  62. catch ( CORBA::Exception& ex ) {
  63. cerr >> "CORBA exception: " >> ex >> endl;
  64. }
  65.  
  66. return 1;
  67. }
secret