In this post I'll explain how to create a simple Custom Broker in
WSO2 CEP 2.1.x.
You can find a sample template Custom Broker project from
here.
The first step is to create a Java project (here I have used Apache Maven),
then create an appropriate Broker Type by extending "org.wso2.carbon.broker.core.BrokerType" and an appropriate
Broker Type Factory by extending the "org.wso2.carbon.broker.core.BrokerTypeFactory" from the jar
org.wso2.carbon.broker.core-4.0.7.jar
org.wso2.carbon.broker.core-4.0.7.jar will be available in the WSO2 public Maven Repository, which you can add to your project pom.xml as:
<repositories>
<repository>
<id>wso2-maven2-repository</id>
<name>WSO2 Maven2 Repository</name>
<url>http://dist.wso2.org/maven2</url>
</repository>
</repositories>
else you can also find that at CEP_HOME/repository/components/plugins/ as org.wso2.carbon.broker.core_4.0.7.jar
When extending the BrokerType, the BrokerTypeImpl has to implement the following methods.
package org.wso2.carbon.broker.core;
import org.apache.axis2.engine.AxisConfiguration;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;
/**
* This is a broker type. these interface let users to publish subscribe messages according to
* some type. this type can either be local, jms or ws
*/
public interface BrokerType {
/**
* object which describes this type. it contains the name and available properties.
*
* @return - type dto
*/
BrokerTypeDto getBrokerTypeDto();
/**
* subscribe to the connection specified in the broker configuration.
*
* @param topicName - topic name to subscribe
* @param brokerListener - broker type will invoke this when it receive events
* @param brokerConfiguration - broker configuration details
* @throws BrokerEventProcessingException - if can not subscribe to the broker
*/
String subscribe(String topicName,
BrokerListener brokerListener,
BrokerConfiguration brokerConfiguration,
AxisConfiguration axisConfiguration) throws BrokerEventProcessingException;
/**
* publish a message to a given connection with the broker configuration.
*
* @param topicName - topic name to publish messages
* @param object - message to send
* @param brokerConfiguration - broker configuration to be used
* @throws BrokerEventProcessingException - if the message can not publish
*/
void publish(String topicName,
Object object,
BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;
/**
* publish test message to check the connection with the broker configuration.
*
* @param brokerConfiguration - broker configuration to be used
* @throws BrokerEventProcessingException - if the message can not publish
*/
void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;
/**
* this method unsubscribes the subscription from the broker.
*
* @param topicName
* @param brokerConfiguration
* @throws BrokerEventProcessingException
*/
void unsubscribe(String topicName,
BrokerConfiguration brokerConfiguration,
AxisConfiguration axisConfiguration, String subscriptionId)
throws BrokerEventProcessingException;
}
Here you have to write the appropriate client code for both sending and receiving events.
The publish() method in the BrokerType will be called when CEP is sending an output, your implementaion to publish the message needs to go under the publish() method. The input parameter message Object, of the publish() method can be either of the following,
- java.lang.String
- java.util.Map
- org.apache.axiom.om.OMElement
- org.wso2.carbon.databridge.commons.Event
In your implementation you don't need to support all the message types, and supporting the appropriate type will be adequate. This is because at the bucket output mapping, user can select the appropriate message type mapping when sending the events.
At the same time when a subscription was made for the subscribe() method the appropriate BrokerListener will be also provided, which is associated with the topic. BrokerListener also have a method onEvent(Object message) to which your client code receiving the incoming events has to pass the received messages. Here the events passed to the onEvent() must have one of the following format,
- java.util.Map
- org.apache.axiom.om.OMElement
- org.wso2.carbon.databridge.commons.Event
When extending the BrokerTypeFactory, the BrokerTypeFactoryImpl has to implement the getBrokerType() method.
package org.wso2.carbon.broker.core;
public interface BrokerTypeFactory {
BrokerType getBrokerType();
}
Then you have to build the jar and deploy that to the CEP server.
If its a plain jar you have to add that to CEP_HOME/repository/components/lib or
If its a OSGi bundle then you can add that to CEP_HOME/repository/components/dropings
Next, you have to create a file called "broker.xml" at wso2cep-2.0.1/repository/conf directory, and add the implantation class as in the following XML:
<brokerTypes xmlns="http://wso2.org/carbon/broker">
<brokerType class="org.test.cep.broker.BrokerTypeFactoryImpl"/>
</brokerTypes>
Note: In the above code snippet change the "org.test.cep.broker.BrokerTypeFactoryImpl" with your implementation class name.
Finally, restart & login to the CEP server, go to Configure -> Broker -> Add, and there you will be able configure your custom broker.
You will be able to download a sample template project from :
https://dl.dropbox.com/u/17922825/blog/cep-ext-broker.zip