The Kuali Service Bus (KSB) is installed as a Kuali Rice (Rice) Module using Spring. Here is an example XML snippet showing how to configure Rice and KSB using Spring:
<bean id="rice" class="org.kuali.rice.core.config.RiceConfigurer"> <property name="dataSource" ref="workflowDataSource" /> <property name="nonTransactionalDataSource" ref="nonTransactionalWorkflowDataSource" /> <property name="transactionManager" ref="myJtaTransactionManager" /> <property name="userTransaction" ref="myJtaUserTransaction" /> <!—- additional rice configuration here --> <property name="properties"> <props> <prop key="service.namespace">MYAPP</prop> <prop key="message.persistence">true</prop> <prop key="message.delivery">asynchronous</prop> <prop key="Routing.ImmediateExceptionRouting">false</prop> <prop key="RouteQueue.timeIncrement">1000</prop> <prop key="RouteQueue.maxRetryAttempts">3</prop> <prop key="useQuartzDatabase">true</prop> <prop key="ksb.org.quartz.scheduler.instanceId">AUTO</prop> <prop key="ksb.org.quartz.scheduler.instanceName">KSBScheduler</prop> <prop key="ksb.org.quartz.jobStore.isClustered">true</prop> <prop key="ksb.org.quartz.jobStore.tablePrefix">KR_QRTZ_</prop> </props> </property> <property name="modules"> <list> <!—- additional rice module configuration here --> <bean class="org.kuali.rice.ksb.messaging.configorg.kuali.rice.ksb.messaging.config.KSBConfigurer"> <property name="serviceServletUrl" value="http://yourlocalip:8080/myapp/remoting" /> </bean> </list> </property> </bean>
The KSBTestHarnessSpring.xml located in the project folder under /ksb/src/test/resources/ is a good starting place to explore KSB configuration in depth. The first thing the file does is use a PropertyPlaceholderConfigurer to bring tokens into the Spring file for runtime configuration. The source of the tokens is the xml file: ksb-test-config.xml located in the /ksb/src/test/resources/META-INF directory.
<bean id="config" class="org.kuali.rice.core.config.spring.ConfigFactoryBean"> <property name="configLocations"> <list> <value>classpath:META-INF/ksb-test-config.xml</value> </list> </property> </bean> <bean id="configProperties" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetObject" ref="config" /> <property name="targetMethod" value="getProperties" /> </bean> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="properties" ref="configProperties" /> </bean>
Properties are passed into the Rice configurer directly. These could be props loaded from Spring and injected into the bean directly.
You could use the Rice configuration subsystem for configuration.
A JTA TransactionManager and UserTransaction are also being injected into the RiceConfigurer.
As mentioned above, this allows tokens to be used in the Spring file. If you are not familiar with tokens, they look like this in the Spring file: ${datasource.pool.maxSize}
Let's take a look at the ksb-test-config.xml file:
<config> <param name="config.location">classpath:META-INF/common-derby-connection-config.xml</param> <param name="config.location">classpath:META-INF/common-config-test-locations.xml</param> <param name="client1.location">/opt/jenkins-home/jobs/rice-trunk-sitedeploy/workspace/src/test/clients/TestClient1</param> <param name="client2.location">/opt/jenkins-home/jobs/rice-trunk-sitedeploy/workspace/src/test/clients/TestClient2</param> <param name="ksb.client1.port">9913</param> <param name="ksb.client2.port">9914</param> <param name="ksb.testharness.port">9915</param> <param name="threadPool.size">1</param> <param name="threadPool.fetchFrequency">3000</param> <param name="bus.refresh.rate">3000</param> <param name="bam.enabled">true</param> <param name="transaction.timeout">3600</param> <param name="keystore.alias">rice<param> <param name="keystore.password">keystorepass</param> <param name="keystore.file">/opt/jenkins-home/jobs/rice-trunk-sitedeploy/workspace/src/test/resources/keystore/ricekeystore</param> <param name="keystore.location">/opt/jenkins-home/jobs/rice-trunk-sitedeploy/workspace/src/test/resources/keystore/ricekeystore</param> <param name="use.clearDatabaseLifecycle">true</param> <param name="use.sqlDataLoaderLifecycle">true</param> <!-- bus messaging props --> <param name="message.delivery">synchronous</param> <param name="message.persistence">true</param> <param name="useQuartzDatabase">false</param> <param name="rice.ksb.loadModuleConfiguration">false</param> <param name="config.location">${additional.config.locations}</param> <param name="config.location">${alt.config.location}</param> </config>
This is an XML file for configuring key value pairs. When used in conjunction with Spring tokenization and the PropertyPlaceHolderConfigurer bean, the parameter name must be equal to the key value in the Spring file so that the properties propagate successfully.
When doing persistent messaging it is best practice to use JTA as your transaction manager. This ensures that the messages you are sending are synchronized with the current executed transaction in your application. It also allows message persistence to be put in a different database than the application’s logic if needed. Currently, KSBTestHarnessSpring.xml uses JOTM to configure JTA without an application server. Atomikos is another JTA product that could be used in the future for Rice and you could consider using it instead of JOTM. Below is the bean definition for JOTM that you can find in Spring:
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"> <property name="defaultTimeout" value="${transaction.timeout}"/> </bean> <bean id="dataSource" class="org.kuali.rice.database.XAPoolDataSource"> <property name="transactionManager" ref="jotm" /> <property name="driverClassName" value="${datasource.driver.name}" /> <property name="url" value="${datasource.url}" /> <property name="maxSize" value="${datasource.pool.maxSize}" /> <property name="minSize" value="${datasource.pool.minSize}" /> <property name="maxWait" value="${datasource.pool.maxWait}" /> <property name="validationQuery" value="${datasource.pool.validationQuery}" /> <property name="username" value="${datasource.username}" /> <property name="password" value="${datasource.password}" /> </bean>
Atomikos’ configuration is very nearly the same. Configure the TransactionManager, UserTransaction, and a DataSource. If using JOTM, use the Rice XAPoolDataSource class as your data source because it addresses some bugs in the StandardXAPoolDataSource, which extends from this class.
Next, you must inject the JOTM into the RiceConfigurer:
<bean id="rice" class="org.kuali.rice.core.config.RiceConfigurer"> <property name=" serviceNamespace" value="KEW" /> <property name="dataSource" ref="dataSource" /> <property name="transactionManager" ref="jotm" /> <property name="userTransaction" ref="jotm" /> <property name="rootConfig" ref="config" /> <property name="modules"> <...more.../>
Configuring JTA from an appserver is no different, except the TransactionManager and UserTransaction are going to be fetched using a JNDI FactoryBean from Spring.
You set the serviceNamespace property in the example above by injecting the name into the RiceConfigurer. You can do this instead of setting the property in the configuration system.
You can configure KSB by injecting a PlatformTransactionManager into the KSBConfigurer.
This eliminates the need for JTA. Behind the scenes, KSB uses Apache's OJB as its Object Relational Mapping.
Before you can use PlatformTransactionManager, you must have a client application set up the OJB so that KSB can use it.
This is a good option if you are an OJB shop and you want to continue using your current setup without introducing JTA into your stack. Normally, when a JTA transaction is found, the message is not sent until the transaction commits. In this case, the message is sent immediately.
Let's take a look at the KSBTestHarnessNoJtaSpring.xml file. Instead of JTA, the following transaction and DataSource configuration is declared:
<bean id="ojbConfigurer" class="org.springmodules.orm.ojb.support.LocalOjbConfigurer" /> <bean id="transactionManager" class="org.springmodules.orm.ojb.PersistenceBrokerTransactionManager"> <property name="jcdAlias" value="dataSource" /> </bean> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName"> <value>${datasource.driver.name}</value> </property> <property name="url"> <value>${datasource.url}</value> </property> <property name="username"> <value>${datasource.username}</value> </property> <property name="password"> <value>${datasource.password}</value> </property> </bean>
The RiceNoJtaOJB.properties file needs to include the Rice connection factory property value:
ConnectionFactoryClass=org.springmodules.orm.ojb.support.RiceDataSourceConnectionFactory
Often, the DataSource is pulled from JNDI using a Spring FactoryBean. Next, we inject the DataSource and transactionManager (now a Spring PlatformTransactionManager).
<bean id="rice" class="org.kuali.rice.core.config.RiceConfigurer"> <property name="serviceNamespace" value="KEW" /> <property name="dataSource" ref="dataSource" /> <property name="nonTransactionalDataSource" ref="dataSource" /> <property name="rootConfig" ref="config" /> <property name="modules"> <list> <bean class="org.kuali.rice.ksb.messaging.config.KSBConfigurer"> <property name="serviceServletUrl" value="http://yourlocalip:${ksb.testharness.port}/en-test/remoting/" /> <property name="platformTransactionManager" ref="transactionManager" /> <... more .../>
Notice that the transactionManager is injected into the KSBConfigurer directly. This is because only KSB, and not Rice, supports this type of configuration. The DataSource is injected normally. When doing this, the OJB setup is entirely in the hands of the client application. That doesn't mean anything more than providing an OJB.properties object at the root of the classpath so OJB can load itself. KSB will automatically register its mappings with OJB, so they don't need to be included in the repository.xml file.
To allow external bus clients to invoke services on the bus-connected node, you must configure the KSBDispatcherServlet in the web applications web.xml file. For example:
<servlet> <servlet-name>remoting</servlet-name> <servlet-class>org.kuali.rice.ksb.messaging.servlet.KSBDispatcherServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>remoting</servlet-name> <url-pattern>/remoting/*</url-pattern> </servlet-mapping>
This allows bus-exposed services to be accessed at a URL like http://yourlocalip:8080/myapp/remoting/[KSB:service name]. Notice how this URL corresponds to the configured serviceServletUrl property on the KSBConfigurer.
The service bus leverages the Rice configuration system for its configuration. Here is a comprehensive set of configuration parameters that you can use to configure the Kuali Service Bus:
Table 8.1. KSB Configuration Parameters
Parameter | Required | Default Value |
---|---|---|
bam.enabled | Whether Business Action Messaging is enabled | false |
bus.refresh.rate | How often the service bus will update the services it has deployed in minutes. | 30 |
bus.services | A list of services to deploy on the bus. | |
bus.storeAndForward | Determines whether to persist the call to the remove service on the bus before invoking the service. | |
dev.mode | no | false |
service.namespace | yes | None |
message.persistence | no | false |
message.delivery | no | asynchronous |
message.off | no | false |
ksb.mode | The mode that KSB will run in; choices are "local", "embedded", or "remote". | local |
ksb.url | The base URL of KSB services and pages. | ${application.url}/ksb |
RouteQueue.maxRetryAttempts | no | 7 |
RouteQueue.timeIncrement | no | 1 hour |
Routing.ImmediateExceptionRouting | no | false |
RouteQueue.maxRetryAttemptsOverride | no | None |
rice.ksb.batch.mode | A service bus mode suitable for running batch jobs; it, like the KSB dev mode, runs only local services. | local |
rice.ksb.loadModuleConfiguration | Determines whether the KSB should load as a service itself as a KRAD module. | true |
rice.ksb.struts.config.files | The struts-config.xml configuration file that the KSB portion of the Rice application will use. | /ksb/WEB-INF/struts-config.xml |
threadPool.size | The size of the KSB thread pool. | 5 |
useQuartzDatabase | no | false |
ksb.org.quartz.* | no | None |
rice.ksb.config.allowSelfSignedSSL | no | false |
Indicates whether this node should export and consume services from the entire service bus. If set to true, then the machine will not register its services in the global service registry. Instead, it can only consume services that it has available locally. In addition to this, other nodes on the service bus will not be able to "see" this node and will therefore not forward any messages to it.
An identifier that indicates the name of the "logical" node on the service bus. If the application is running in a cluster, this will typically be the same for each machine in the cluster. The service namespace is used in exactly the way it sounds, for namespacing of services, among other things. This can also be set directly on RiceConfigurer using the serviceNamespace property.
If true, then messages will be persisted to the datastore. Otherwise, they will only be stored in memory. If message persistence is not turned on and the server is shutdown while there are still messages that need to be sent, those messages will be lost. For a production environment, it is recommended that you set message.persistence to true.
Can be set to either synchronous or asynchronous. If this is set to synchronous, then messages that are sent in an asynchronous fashion using the KSB API will instead be sent synchronously. This is useful in certain development and unit testing scenarios. For a production environment, it is recommended that you set message delivery to asynchronous.
It is strongly recommended that you set message.delivery to asynchronous for all cases except for when implementing automated tests or short-lived programs that interact with the service bus.
If set to true, then asynchronous messages will not be sent. In the case that message persistence is turned on, they will be persisted in the message store and can even be picked up later using the Message Fetcher. However, if message persistence is turned off, these messages will be lost. This can be useful in certain debugging or testing scenarios.
Sets the default number of retries that will be executed if a message fails to be sent. You can also customize this retry count for a specific service (see Exposing Services on the Bus).
Sets the default time increment between retry attempts. As with RouteQueue.maxRetryAttempts, you can also configure this at the service level.
If set to true, then messages that fail to be sent will not be retried. Instead, their MessageExceptionHandler will be invoked immediately.
If set with a number, it will temporarily set the retry attempts for ALL services going into exception routing. You can set the number arbitrarily high to prevent all messages in a node from making it to exception routing if they are having trouble. The message.off param produces the same result.
When using the embedded Quartz scheduler started by the KSB, indicates whether that Quartz scheduler should store its entries in the database. If this is true, then the appropriate Quartz properties should be set as well. (See ksb.org.quartz.* below).
Can be used to pass Quartz properties to the embedded Quartz scheduler. See the configuration documentation on the Quartz site. Essentially, any property prefixed with ksb.org.quartz. will have the "ksb." portion stripped and will be sent as configuration parameters to the embedded Quartz scheduler.
If true, then the bus will allow communication using the https protocol between machines with self-signed certificates. By default, this is not permitted and if attempted you will receive an error message like this:
It is best practice to only set this to 'true' in non-production environments!
In addition to the configuration parameters that you can specify using the Rice configuration system, the KSBConfigurer bean itself has some properties that can be injected in order to configure it:
By default, KSB uses an embedded Quartz scheduler for scheduling the retry of messages that fail to be sent. If desired, a Quartz scheduler can instead be injected into the KSBConfigurer and it will use that scheduler instead. See Quartz Scheduling for more detail.
Specifies the javax.sql.DataSource to use for storing the asynchronous message queue. If not specified, this defaults to the DataSource injected into the RiceConfigurer.
If this DataSource is injected, then the registryDataSource must also be injected and vice-versa.
Specifies the javax.sql.DataSource to use that matches the messageDataSource property. This datasource instance must not be transactional. If not specified, this defaults to the nonTransactionalDataSource injected into the RiceConfigurer.
Specifies the javax.sql.DataSource to use for reading and writing from the Service Registry. If not specified, this defaults to the DataSource injected into the RiceConfigurer.
If this DataSource is injected, then the messageDataSource must also be injected and vice-versa.
The application needs to do one more thing to begin publishing services to the bus: Configure the KSBConfigurer object. This can be done using Spring or programmatically. We'll use Spring because it's the easiest way to get things configured:
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean"> <property name="defaultTimeout" value="${transaction.timeout}"/> </bean> <bean id="dataSource" class=" org.kuali.rice.core.database.XAPoolDataSource "> <property name="transactionManager"> <ref local="jotm" /> </property> <property name="driverClassName"> <value>oracle.jdbc.driver.OracleDriver</value> </property> <property name="maxSize"> <value>25</value> </property> <property name="minSize"> <value>2</value> </property> <property name="maxWait"> <value>5000</value> </property> <property name="validationQuery"> <value>select 1 from dual</value> </property> <property name="url"> <value>jdbc:oracle:thin:@129.79.44.172:1521:XE</value> </property> <property name="username"> <value>myapp</value> </property> <property name="password"> <value>password</value> </property> </bean> <bean id="nonTransactionalDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName"> <value>oracle.jdbc.driver.OracleDriver</value> </property> <property name="maxActive"> <value>50</value> </property> <property name="minIdle"> <value>7</value> </property> <property name="initialSize"> <value>7</value> </property> <property name="validationQuery"> <value>select 1 from dual</value> </property> <property name="url"> <value>jdbc:oracle:thin:@129.79.44.172:1521:XE</value> </property> <property name="username"> <value>myapp</value> </property> <property name="password"> <value>password</value> </property> <property name="accessToUnderlyingConnectionAllowed"> <value>true</value> </property> </bean> <bean id="rice" class="org.kuali.rice.core.config.RiceConfigurer"> <property name="serviceNamespace" value="MyApp" /> <property name="dataSource" ref="dataSource" /> <property name="nonTransactionalDataSource" ref="nonTransactionalDataSource" /> <property name="transactionManager" ref="jotm" /> <property name="userTransaction" ref="jotm" /> <property name="ksbConfigurer"> <bean class="org.kuali.rice.ksb.messaging.config.KSBConfigurer"/> </property> </bean>
The application is now ready to deploy services to the bus. Let's take a quick look at the Spring file above and what's going on there: The following configures JOTM, which is currently required to run KSB.
<bean id="jotm" class="org.springframework.transaction.jta.JotmFactoryBean" />
Next, we configure the XAPoolDataSource and the non transactional BasicDataSource. This is pretty much standard data source configuration stuff. The XAPoolDataSource is configured through Spring and not JNDI so it can take advantage of JTOM. Servlet containers, which don't support JTA, require this configuration step so the datasource will use JTA.
<bean id="dataSource" class=" org.kuali.rice.core.database.XAPoolDataSource "> <property name="transactionManager"> <ref local="jotm" /> </property> <property name="driverClassName"> <value>oracle.jdbc.driver.OracleDriver</value> </property> <property name="maxSize"> <value>25</value> </property> <property name="minSize"> <value>2</value> </property> <property name="maxWait"> <value>5000</value> </property> <property name="validationQuery"> <value>select 1 from dual</value> </property> <property name="url"> <value>jdbc:oracle:thin:@129.79.44.172:1521:XE</value> </property> <property name="username"> <value>myapp</value> </property> <property name="password"> <value>password</value> </property> </bean> <bean id="nonTransactionalDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName"> <value>oracle.jdbc.driver.OracleDriver</value> </property> <property name="maxActive"> <value>50</value> </property> <property name="minIdle"> <value>7</value> </property> <property name="initialSize"> <value>7</value> </property> <property name="validationQuery"> <value>select 1 from dual</value> </property> <property name="url"> <value>jdbc:oracle:thin:@129.79.44.172:1521:XE</value> </property> <property name="username"> <value>myapp</value> </property> <property name="password"> <value>password</value> </property> <property name="accessToUnderlyingConnectionAllowed"> <value>true</value> </property> </bean>
Next, we configure the bus:
<bean id="rice" class="org.kuali.rice.core.config.RiceConfigurer"> <property name="serviceNamespace" value="MyApp" /> <property name="dataSource" ref="dataSource" /> <property name="nonTransactionalDataSource" ref="nonTransactionalDataSource" /> <property name="transactionManager" ref="jotm" /> <property name="userTransaction" ref="jotm" /> <property name="ksbConfigurer"> <bean class="org.kuali.rice.ksb.messaging.config.KSBConfigurer"/> </property> </bean>
We are injecting JOTM, and the datasources. The injection of the KSBConfigurer class into the ksbConfigurer property tells this instance of Rice to start the Service Bus. The final necessary step is setting the 'serviceNamespace' property to some value that will identify all services deployed from this node as a member of this node. Effectively, this is the name of the application.
At this point, the application is configured to use the bus, both for publishing services and to send messages to services. Usually, applications will publish services on the bus using the KSBConfigurer or the KSBExporter classes. See Acquiring and invoking services for more detail.
As noted in Configuration Parameters, it is possible to configure message delivery to run asynchronously or synchronously. It is imported to understand that asynchronous messing should be used in almost all cases.
Asynchronous messing will result in messages being sent in a separate thread after the original transaction that requested the message to be sent is committed. This is the appropriate behavior in a “fire-and-forget” messaging model. The option to configure message deliver as synchronous was added for two reasons:
To allow for the implementation of automated unit tests which could perform various tests without having to right “polling” code to wait for asynchronous messing to complete.
For short-lived programs (such as batch programs) which need to send messages. This allows for a guarantee that all messages will be sent prior to the application being terminated.
The second case is the only case where synchronous messaging should be used in a production setting, and even then it should be used with care. Synchronous message processing in Rice currently has the following major differences from asynchronous messaging that need to be understood:
Order of Execution
Exception Handling
In asynchronous messaging, messages are queued up until the end of the transaction, and then sent after the transaction is committed (technically, they are sent when the transaction is committed).
In synchronous messaging, messages are processed immediately when they are “sent”. This results in a different ordering of execution when using these two different messaging models.
In asynchronous messaging, whenever there is a failure processing a message, an exception handler is invoked. Recovery from such failures can include resending the message multiple times, or recording and handling the error in some other way. Since all of this is happening after the original transaction was committed, it does not affect the original processing which invoked the sending of the message.
With synchronous messaging, since the message processing is invoked immediately and the calling code blocks until the processing is complete, any errors raised during messaging will be thrown back up to the calling code. This means that if you are writing something like a batch program which relies on synchronous messaging, you must be aware of this and add code to handle any errors if you want to deal with them gracefully.
Another implication of this is that message exception handlers will not be invoked in this case. Additionally, because an exception is being thrown, this will typically trigger a rollback in any transaction that the calling code is running. So transactional issues must be dealt with as well. For example, if the failure of a single message shouldn’t cause the sending of all messages in a batch job to fail, then each message will need to be sent in it’s own transaction, and errors handled appropriately.