Saturday, February 16, 2013

Writing Custom Broker for WSO2 CEP

In this post I'll explain how to create a simple Custom Broker in WSO2 CEP 2.1.x.

You can find a sample template Custom Broker project from here.

The first step is to create a Java project (here I have used Apache Maven),
then create an appropriate Broker Type by extending "org.wso2.carbon.broker.core.BrokerType" and an appropriate Broker Type Factory by extending the  "org.wso2.carbon.broker.core.BrokerTypeFactory" from the jar org.wso2.carbon.broker.core-4.0.7.jar

org.wso2.carbon.broker.core-4.0.7.jar will be available in the WSO2 public Maven Repository, which you can add to your project pom.xml as:

<repositories>
    <repository>
        <id>wso2-maven2-repository</id>
        <name>WSO2 Maven2 Repository</name>
        <url>http://dist.wso2.org/maven2</url>
    </repository>
</repositories>

else you can also find that at CEP_HOME/repository/components/plugins/ as org.wso2.carbon.broker.core_4.0.7.jar

When extending the BrokerType, the BrokerTypeImpl has to implement the following methods.

package org.wso2.carbon.broker.core;

import org.apache.axis2.engine.AxisConfiguration;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;

/**
 * This is a broker type. these interface let users to publish subscribe messages according to
 * some type. this type can either be local, jms or ws
 */
public interface BrokerType {

    /**
     * object which describes this type. it contains the name and available properties.
     *
     * @return - type dto
     */
    BrokerTypeDto getBrokerTypeDto();

    /**
     * subscribe to the connection specified in the broker configuration.
     *
     * @param topicName           - topic name to subscribe
     * @param brokerListener      - broker type will invoke this when it receive events
     * @param brokerConfiguration - broker configuration details
     * @throws BrokerEventProcessingException - if can not subscribe to the broker
     */
    String subscribe(String topicName,
                     BrokerListener brokerListener,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration) throws BrokerEventProcessingException;

    /**
     * publish a message to a given connection with the broker configuration.
     *
     * @param topicName           - topic name to publish messages
     * @param object              - message to send
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void publish(String topicName,
                 Object object,
                 BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;


    /**
     * publish test message to check the connection with the broker configuration.
     *
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;

    /**
     * this method unsubscribes the subscription from the broker.
     *
     * @param topicName
     * @param brokerConfiguration
     * @throws BrokerEventProcessingException
     */
    void unsubscribe(String topicName,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration, String subscriptionId)
            throws BrokerEventProcessingException;

}

Here you have to write the appropriate client code for both sending and receiving events.

The publish() method in the BrokerType will be called when CEP is sending an output, your implementaion to publish the message needs to go under the publish() method. The input parameter message Object, of the publish() method can be either of the following,
  • java.lang.String
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
In your implementation you don't need to support all the message types, and supporting the appropriate type will be adequate.  This is because at the bucket output mapping, user can select the appropriate message type mapping when sending the events.

At the same time when a subscription was made for the subscribe() method the appropriate BrokerListener will be also provided, which is associated with the topic. BrokerListener also have a method onEvent(Object message) to which your client code receiving  the incoming events has to pass the received messages. Here the events passed to the onEvent() must have one of the following format,
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
When extending the BrokerTypeFactory, the BrokerTypeFactoryImpl has to implement the getBrokerType() method.

package org.wso2.carbon.broker.core;

public interface BrokerTypeFactory {
    BrokerType getBrokerType();
}

Then you have to build the jar and deploy that to the CEP server.

If its a plain jar you have to add that to  CEP_HOME/repository/components/lib or
If its a OSGi bundle then you can add that to CEP_HOME/repository/components/dropings

Next, you have to create a file called "broker.xml" at wso2cep-2.0.1/repository/conf directory, and add the implantation class as in the following XML:

<brokerTypes xmlns="http://wso2.org/carbon/broker">
    <brokerType class="org.test.cep.broker.BrokerTypeFactoryImpl"/>
</brokerTypes>

Note: In the above code snippet change the "org.test.cep.broker.BrokerTypeFactoryImpl" with your implementation class name.

Finally, restart & login to the CEP server,  go to Configure -> Broker -> Add, and there you will be able configure your custom broker.



You will be able to download a sample template project from : https://dl.dropbox.com/u/17922825/blog/cep-ext-broker.zip

Friday, January 11, 2013

Using Siddhi CEP as a Java library


WSO2 Complex Event processor uses Siddhi as the Backend Runtime Engine, which was initially developed at http://siddhi.sourceforge.net/, and its development is now being continued as a commons project of WSO2 Inc.
Siddhi under Apache License V2, and its a product designed for real time processing both in standalone mode and in distributed mode.

Siddhi is basically a Java library that's embedded in WSO2 CEP server to provide enterprise integration for Complex Event Processing.

Since Siddhi is just a java library here are some samples demonstrating siddhi’s capabilities as a library.

You can find the WSO2 CEP product from: 

You can find the Documentation for WSO2 CEP from: 

You can find the Documentation for Siddhi Language Specification from: 

Saturday, December 8, 2012

Single Sign On across Web Applications and Web Services

Here the requirement is to have a single sign on across Web application and Web Services. In more detail once the user is  authenticated to access a WepApp he should not only be able to access other WepApps but those WebApps also should be able to access a set of back-end services with the logged in user's access rights with no further authentication. Here the back-end services can also be configured to authorize the user based on different user claims (E.g. email).


In the situation we can assume:
•  trust relation between Identity Server and Resource Server (ESB Proxy),
•  trust relation between Identity Server and Resource Client (Web App)
The communications are:

  1. User connect to the Web App with is the “Resource Client”
  2. WebApp finds out user is not authenticated and redirects to the SAML2 IdP.
  3. SAML2 Idp checks whether the user has an authenticated session - if not will prompt for credentials, once authenticated there ,user will be redirected back to WebApp (Resource Client)  with a SAML token, with the set of claims requested by the WebApp.
  4. Now, the WebApp (Resource Client) needs to access a back-end web service with the logged in user's access rights. WebApp process the “SAML assertion token” and creates the STSToken from which it calls the Proxy Service (Resource server).
  5. Proxy Service (Resource server) checks the “SAML assertion token” and allows access to the backend Web Service.
This is tested with;
•  WSO2 Identity Server 4.0.0
•  WSO2 Enterprise Service Bus 4.5.1
•  Apache Tomcat 7.0.25
•  Apache Subversion 1.6.12
•  Apache Maven 3.2.3
•  Java 1.6.0_26
Steps,

Step 1,

Download and deploy WSO2 IS 4.0.0 from http://wso2.org/products/identity-server/

Step 2
Download WSO2 ESB 4.5.1 from http://wso2.com/products/enterprise-service-bus/.
When deploying ESB change the port offsets by 1, by editing

<ESB_HOME>/repository/conf/carbon.xml file, and change the Offset element as follows
<Offset>1</Offset>


Step 3,

Download and deploy Apache Tomcat

Step 4,
Checkout https://svn.wso2.org/repos/wso2/people/suho/sso-webapp-ws sso-webapp-ws
This contains all the artifacts needed for this sample

Step 5,
Setup WSO2 IS for SSO for Web Apps
  1. Login to IS
  2. Click SAML SSO from the management console
  3. Add SSO details

    For the avis.com WebApp
    Issuer: http://localhost:8081/avis.com/consumer
    Assertion Consumer URL: http://localhost:8081/avis.com/consumer
    Check “Enable Attribute Profile” and add the following claims
              http://wso2.org/claims/givenname
              http://wso2.org/claims/emailaddress
    For the travelocity.com WebApp
    Issuer: http://localhost:8081/travelocity.com/consumer
    Assertion Consumer URL: http://localhost:8081/travelocity.com/consumer
    Check “Enable Attribute Profile” and add the following claims
              http://wso2.org/claims/givenname
              http://wso2.org/claims/emailaddress
    When successfully added you will get Consumer Index for each entry

    Now the SSO setup in the WSO2 IS is done.

Step 6,
Configuring the WebApps
  1. You can add these by editing the war file or edit in the src-dist and build that.
  2. Edit the WEB-INF/web.xml of each WebApp and add the appropriate “Consumer Index” returned by the WSO2IS as the param-value for AttributeConsumingServiceIndex.
  3. Edit the WEB-INF/classes/filepath.properties of each WebApp and add the appropriate
    properties. You can find all the necessary files in the resources directory.
    E.g.
    keystore.file=/<path>/sso-webapp-ws/resources/keystore/wso2carbon.jks
    repo.directory=/<path>/sso-webapp-ws/resources/repo
    bearerpolicy.file=/<path>/sso-webapp-ws/resources/bearer-policy.xml
    service.endpoint=http://localhost:8281/services/echoProxy

Step 7,
Add the proxy service
  1. Login to ESB
  2. Click on Add Proxy Service from the management console
  3. Create a Pass Through Proxy
  4. Add the proxy name and endpoint
    Proxy Service Name: echoProxy
    Target Endpoint: “Enter URL”
    Target URL: http://localhost:8281/services/echo
    And click “Create”

    Here the echo service (Endpoint service) is deployed on ESB by default

Step 8,
Add the policy to the registry to secure the Proxy Service

  1. Click on Registry from the ESB management console
  2. Navigate to /_system/config/repository and click on the repository
  3. Under “Entries” click on the add recourses to add policy file
  4. Now browse and add the bearer-policy.xml. This will be in the resources folder you
    checked out. Click “Add” to add the file.

Step 9,
Securing the proxy service
  1. Click on List under services from the ESB management console, and click on “Unsecured” link next to the echoProxy service entry.
  2. Select “yes”
  3. From the “Policy From Registry” section, click in Configuration registry to add policy from the cofig registry.
  4. Browse for the added “bearer-policy.xml” and click “OK”
  5. Click “Next”, then click on the “admin” and “wso2carbon.jks” checkboxes, select the private key store as “wso2carbon.jks” and click on “Finish” to Activate Security.
Step 10,
Deploy the webapps in Tomcat

Step 11,
  1. Enter following in the redirected page
    User Name: admin
    Password: admin

    With successful login you will be redirected to the home page, where the Web App will be call the proxy using the “SAML assertion token” received at the SSO login.
  2. With successful login at the Proxy Service you will get the following page
    Note: Here the Web Service Response will be as “Hi WSO2”
  3. If the login at the Proxy Service fails you will get the following page
    Note: Here the Web Service Response will be as “’Hi WSO2’ expected but, no response from the web service”

I would like to thank the WSO2 forks; Suresh AttanayakePrabath Siriwardena, & Balachandiran Ajanthan for helping me on this.


Wednesday, November 21, 2012

Enabling WSO2 ESB APIs to Pass-Through response Content-Types



This post explains how to configure WSO2ESB 4.5.x to Pass-Through response Content-Types.

WSO2 ESB is build to work on soap environments therefore WSO2 ESB (proxy service) always sends the response message in the same Content Type of the request, which is the expected behaviour.
This works fine for SOAP, because the incoming message to the proxy service and the outgoing  messages from the proxy service are both SOAP messages.



But this is not the case for REST calls, there can be cases where the proxy service need to mimic the actual service as it is.
E.g. Client sends the request in the Content-Type to be "application/json" and the Actual service receives the request in the same "application/json"  Content-Type and responds in the Content-Type of "text/plain" which the client can Accept.


This can be achieved using Message Relays, but in any case if you need to achieve this for your custom formatters you have to manually set these Content-Type as below.

<api name="api_content_type_passthrough" context="/esbsanitycheck">
  <resource methods="POST GET DELETE PUT">
     <inSequence>
        <send>
           <endpoint key="ep_esb_sanity_check"/>
        </send>
     </inSequence>
     <outSequence>
        <property name="messageType" expression="$trp:Content-Type" scope="axis2"/>
        <property name="ContentType" expression="$trp:Content-Type" scope="axis2"/>
        <send/>
     </outSequence>
  </resource>
</api>

Monday, August 27, 2012

Distributed Processing Sample for WSO2 CEP


Today we released WSO2 Complex Event Processor 2.0.0 Milestone 2.
This is available at  https://svn.wso2.org/repos/wso2/people/suho/packs/cep/wso2cep-2.0.0-M2.zip

One of the key feature of this CEP is its support for distributed processing via Siddhi CEP Engine. To demonstrate its capability I came up with a sample on distributed Processing. This sample uses Siddhi CEP Engine for processing and JMS ActiveMQ Broker to publish and subscribe events.

To run the Distributed Processing Sample follow the steps give below;
 
Step 01: Configure and run ActiveMQ in your local machine
Download the ActiveMQ from "http://activemq.apache.org/activemq-543-release.html".
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin (in Linux)
Note: WSO2CEP has been tested with ActiveMQ 5.4.3

For each CEP node in the cluster follow the steps from 02 to 08.

Step 02: Deploy CEP server 
Unzip the CEP server (Do not start the server).
      
Step 03: Change the CEP server Offset.
If you are running multiple servers in the same machine change the offset
from file "wso2cep-2.0.0-2/repository/conf/carbon.xml" to different numbers. 
E.g.
<offset>1</offset>

This is to overcome server port conflicts.
E.g. If three WSO2 servers are going to be deployed in the same machine, they can have offsets as 1, 2, and 3.

Step 04: Copy paste ActiveMQ jars.
Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to 
wso2cep-2.0.0/samples/lib directory.
Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar
from apache-activemq-xxx/lib to wso2cep-2.0.0/repository/components/lib directory
 
Step 06: Deploy required broker management configuration
This allows CEP to receive and send messages via JMS Broker.
To deploy run "ant deploy-broker-manager" from wso2cep-2.0.0/samples/cep-samples directory.

Step 07: Configure input, output and queries
For this sample, the configurations of input, output and queries are at wso2cep-2.0.0-1/samples/cep-samples/conf/buckets/purchase-analyser-bucket.xml
When running multiple server nodes each server need to receive the input events through different topics. To enable this behaviour, open the above file and change the input topics to have different names. The default input topic name is “PurchaseTopic”.

E.g changing the names to “PurchaseTopic1”.
<input brokername="activemqJmsBroker" topic="PurchaseTopic1" />

Note : To enable distributed processing (already enabled for this sample) the change we have to do is to make the "siddhi.enable.distributed.processing" property to "true".

<engineproviderconfiguration engineprovider="SiddhiCEPRuntime">
     <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
     <property name="siddhi.enable.distributed.processing">true</property>
</engineproviderconfiguration>

Step 08: Deploy bucket configuration
Deploy the bucket by adding the configuration purchase-analyser-bucket.xml to the directory wso2cep-2.0.0/repository/deployment/server/cepbuckets/

Step 09: Start all the servers
Start CEP servers by running "./wso2server.sh" from wso2cep-2.0.0/bin directory 

Step 10:Subscribing to output topic
Start the subscriber of the output topic "PurchaseResults" by running "ant purchaseResultsSubscriber" in a separate terminal,
from wso2cep-2.0.0/samples/cep-samples directory

Step 11:Publishing events
To publish events to all the servers (to their input topics as we defined in Step 07 ) run the publisher from wso2cep-2.0.0/samples/cep-samples directory
with the command  "ant purchasePublisher -Dtopics=xxx,xxx,..."
The client will publish events to all the given topics in a round robin manner.

E.g If we we have configured the buckets to receive the inputs via the input topics as PurchaseTopic1, PurchaseTopic2 and PurchaseTopic3
"ant purchasePublisher -Dtopics=PurchaseTopic1,PurchaseTopic2,PurchaseTopic3."


Step 12: Observation
You will be able to observe how the counts in the results steadily increases when messages are sent to any of the servers, and how the results remain consistent even when some servers goes down and comes back.