Monday, February 25, 2013

No connector available to access repository - FIXED! - Maven Wagon

When you try to deploy artifacts to Sonatype using maven 3, If you encounter the following error, you probably have not configured Maven Wagon properly  

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.444s
[INFO] Finished at: Tue Feb 26 06:37:48 IST 2013
[INFO] Final Memory: 10M/981M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.7
:deploy (default-deploy) on project test-core: Failed to deploy artifacts/metad
ata: No connector available to access repository xxx-maven2-repository (scp://x
xxx/maven2/) of type default using the available factories WagonRepositoryConne
ctorFactory -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swi
tch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please re
ad the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecution
Exception

To fix this issue 
  • Add wagon extensions to your pom.
    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.4</version>
            </extension>
        </extensions>
    </build>
  • Add distribution management segment to your pom.
    <distributionManagement>
        <repository>
            <id>xxx-maven2-repository</id>
            <name>Xxx Maven2 Repository</name>
            <url>scp://xxxx/maven2/</url>
        </repository>
        <snapshotRepository>
            <id>xxx-maven2-snapshot-repository</id>
            <name>Xxx Maven2 Snapshot Repository</name>
            <url>scp://xxxx/snapshots/maven2/</url>
        </snapshotRepository>
    </distributionManagement>
Not it works !
For more info on using extensions refer: http://maven.apache.org/guides/mini/guide-using-extensions.html

Monday, February 18, 2013

Using Apache Qpid with WSO2 CEP

Though WSO2 CEP 2.x.x can work with Qpid, it does not have samples that demonstrates this, where  CEP only have samples for ActiveMQ.

Here I'm providing steps to configure, and run some samples using Apache Qpid.

First step is downloading and running Apache Qpid.
You can download Qpid from the project web site. Note, I have used Qpid java 0.18 for these samples, you can download that from here.

Next, unzip and start the broker.

Now to configure WSO2 CEP with Qpid JMS broker, follow the steps provided at Configuring JMS-Qpid Broker.

Test publishing and subscribing to JMS Map messages


Edit the jms-twitter-stockquote-analyser.xml provided at /samples/cep-samples/conf/buckets directory by changing the input and output broker names as, 
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
     ...
     <input topic="AllStockQuotes" brokerName="qpidJmsBroker">
     ...
     <input topic="TwitterFeed" brokerName="qpidJmsBroker">
     ...
     ...
     <output topic="PredictedStockQuotes" brokerName="qpidJmsBroker">
     ...
</bucket>

Here we are using the brokerName as "qpidJmsBroker", because we have define it so, when Configuring JMS-Qpid Broker.

Next copy past this file to /repository/deployment/server/cepbuckets directory. 

To test the broker download the client code from here, and unzip cep-210-jms-client-qpid.zip 

To subscribe to the PredictedStockQuotes topic, in the terminal go to cep-210-jms-client-qpid directory and run:
ant mapPredictedStockQuotesSubscriber
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant mapAllStockQuotesPublisher
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant mapTwitterFeedPublisher
Then you will be able to observe the outputs. Here the output will be similar to the output of Twitter and StockQuote Analyzer sample.

Test publishing and subscribing to JMS XML messages


Edit the xml-stockquote-anlyzer.xml provided at /samples/cep-samples/conf/buckets directory by changing the input and output broker names as, 

<cep:bucket name="XMLStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
     ...
     <cep:input topic="AllStockQuotes" brokerName="qpidJmsBroker">
     ...
     ...
     <cep:output topic="FastMovingStockQuotes" brokerName="qpidJmsBroker">
     ...
</cep:bucket>

Here we are using the brokerName as "qpidJmsBroker", because we have define it so, when Configuring JMS-Qpid Broker.

Next copy past this file to /repository/deployment/server/cepbuckets directory, and make sure to delete other bucket configurations in order to avoid conflicts.

If you don't have the client code (used for the previous sample), download the client code from here, and unzip cep-210-jms-client-qpid.zip, 

To subscribe to the FastMovingStockQuotes topic, in the terminal go to cep-210-jms-client-qpid directory and run:
ant xmlFastMovingStockQuotesSubscriber
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant xmlAllStockQuotesPublisher
You will be able to observe the outputs. Here the output will be similar to the output of Stock Quote Analyzer sample.

Saturday, February 16, 2013

Writing Custom Broker for WSO2 CEP

In this post I'll explain how to create a simple Custom Broker in WSO2 CEP 2.1.x.

You can find a sample template Custom Broker project from here.

The first step is to create a Java project (here I have used Apache Maven),
then create an appropriate Broker Type by extending "org.wso2.carbon.broker.core.BrokerType" and an appropriate Broker Type Factory by extending the  "org.wso2.carbon.broker.core.BrokerTypeFactory" from the jar org.wso2.carbon.broker.core-4.0.7.jar

org.wso2.carbon.broker.core-4.0.7.jar will be available in the WSO2 public Maven Repository, which you can add to your project pom.xml as:

<repositories>
    <repository>
        <id>wso2-maven2-repository</id>
        <name>WSO2 Maven2 Repository</name>
        <url>http://dist.wso2.org/maven2</url>
    </repository>
</repositories>

else you can also find that at CEP_HOME/repository/components/plugins/ as org.wso2.carbon.broker.core_4.0.7.jar

When extending the BrokerType, the BrokerTypeImpl has to implement the following methods.

package org.wso2.carbon.broker.core;

import org.apache.axis2.engine.AxisConfiguration;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;

/**
 * This is a broker type. these interface let users to publish subscribe messages according to
 * some type. this type can either be local, jms or ws
 */
public interface BrokerType {

    /**
     * object which describes this type. it contains the name and available properties.
     *
     * @return - type dto
     */
    BrokerTypeDto getBrokerTypeDto();

    /**
     * subscribe to the connection specified in the broker configuration.
     *
     * @param topicName           - topic name to subscribe
     * @param brokerListener      - broker type will invoke this when it receive events
     * @param brokerConfiguration - broker configuration details
     * @throws BrokerEventProcessingException - if can not subscribe to the broker
     */
    String subscribe(String topicName,
                     BrokerListener brokerListener,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration) throws BrokerEventProcessingException;

    /**
     * publish a message to a given connection with the broker configuration.
     *
     * @param topicName           - topic name to publish messages
     * @param object              - message to send
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void publish(String topicName,
                 Object object,
                 BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;


    /**
     * publish test message to check the connection with the broker configuration.
     *
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;

    /**
     * this method unsubscribes the subscription from the broker.
     *
     * @param topicName
     * @param brokerConfiguration
     * @throws BrokerEventProcessingException
     */
    void unsubscribe(String topicName,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration, String subscriptionId)
            throws BrokerEventProcessingException;

}

Here you have to write the appropriate client code for both sending and receiving events.

The publish() method in the BrokerType will be called when CEP is sending an output, your implementaion to publish the message needs to go under the publish() method. The input parameter message Object, of the publish() method can be either of the following,
  • java.lang.String
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
In your implementation you don't need to support all the message types, and supporting the appropriate type will be adequate.  This is because at the bucket output mapping, user can select the appropriate message type mapping when sending the events.

At the same time when a subscription was made for the subscribe() method the appropriate BrokerListener will be also provided, which is associated with the topic. BrokerListener also have a method onEvent(Object message) to which your client code receiving  the incoming events has to pass the received messages. Here the events passed to the onEvent() must have one of the following format,
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
When extending the BrokerTypeFactory, the BrokerTypeFactoryImpl has to implement the getBrokerType() method.

package org.wso2.carbon.broker.core;

public interface BrokerTypeFactory {
    BrokerType getBrokerType();
}

Then you have to build the jar and deploy that to the CEP server.

If its a plain jar you have to add that to  CEP_HOME/repository/components/lib or
If its a OSGi bundle then you can add that to CEP_HOME/repository/components/dropings

Next, you have to create a file called "broker.xml" at wso2cep-2.0.1/repository/conf directory, and add the implantation class as in the following XML:

<brokerTypes xmlns="http://wso2.org/carbon/broker">
    <brokerType class="org.test.cep.broker.BrokerTypeFactoryImpl"/>
</brokerTypes>

Note: In the above code snippet change the "org.test.cep.broker.BrokerTypeFactoryImpl" with your implementation class name.

Finally, restart & login to the CEP server,  go to Configure -> Broker -> Add, and there you will be able configure your custom broker.



You will be able to download a sample template project from : https://dl.dropbox.com/u/17922825/blog/cep-ext-broker.zip