Tuesday, November 8, 2016

Setup Hive to run on Ubuntu 15.04

This is tested on hadoop-2.7.3, and apache-hive-2.1.0-bin.

Improvement on Hive documentation : https://cwiki.apache.org/confluence/display/Hive/GettingStarted

Step 1

Make sure Java is installed

Installation instruction : http://suhothayan.blogspot.com/2010/02/how-to-set-javahome-in-ubuntu.html

Step 2

Make sure Hadoop is installed & running

Instruction : http://suhothayan.blogspot.com/2016/11/setting-up-hadoop-to-run-on-single-node_8.html


Add Hive and Hadoop home directories and paths


$ gedit ~/.bashrc

Add flowing at the end (replace {hadoop path} and {hive path} with proper directory locations)

export HADOOP_HOME={hadoop path}/hadoop-2.7.3

export HIVE_HOME={hive path}/apache-hive-2.1.0-bin
export PATH=$HIVE_HOME/bin:$PATH


$ source ~/.bashrc


Create /tmp and hive.metastore.warehouse.dir and set executable permission create tables in Hive. (replace {user-name} with system username)

hadoop-2.7.3/bin/hadoop fs -mkdir /tmp
$ hadoop-2.7.3/bin/hadoop fs -mkdir /user
$ hadoop-2.7.3/bin/hadoop fs -mkdir /user/{user-name}
$ hadoop-2.7.3/bin/hadoop fs -mkdir /user/{user-name}/warehouse
$ hadoop-2.7.3/bin/hadoop fs -chmod 777 /tmp
$ hadoop-2.7.3/bin/hadoop fs -chmod 777 /user/{user-name}/warehouse


Create hive-site.xml 

$ gedit apache-hive-2.1.0-bin/conf/hive-site.xml

Add following (replace {user-name} with system username):

    <value>/user/{user name}/warehouse</value>

Copy hive-jdbc-2.1.0-standalone.jar to lib

cp apache-hive-2.1.0-bin/jdbc/hive-jdbc-2.1.0-standalone.jar apache-hive-2.1.0-bin/lib/


Initialise Hive with Derby, run:

$ ./apache-hive-2.1.0-bin/bin/schematool -dbType derby -initSchema


Run Hiveserver2:

$ ./apache-hive-2.1.0-bin/bin/hiveserver2

View hiveserver2 logs: 

tail -f /tmp/{user name}/hive.log


Run Beeline on another terminal:

$ ./apache-hive-2.1.0-bin/bin/beeline -u jdbc:hive2://localhost:10000


Enable fully local mode execution: 

hive> SET mapreduce.framework.name=local;


Create table :

hive> CREATE TABLE pokes (foo INT, bar STRING);

Brows table 


Setting up Hadoop to run on Single Node in Ubuntu 15.04

This is tested on hadoop-2.7.3.

Improvement on Hadoop documentation : http://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html

Step 1 

Make sure Java is installed

Installation instruction : http://suhothayan.blogspot.com/2010/02/how-to-set-javahome-in-ubuntu.html

Step 2

Install pre-requisites

$ sudo apt-get install ssh
$ sudo apt-get install rsync

Step 3

Setup Hadoop

$ gedit hadoop-2.7.3/etc/hadoop/core-site.xml

Add (replace {user-name} with system username, E.g "foo" for /home/foo/)


$ gedit hadoop-2.7.3/etc/hadoop/hdfs-site.xml 



Step 4


$ ssh localhost 

If it requested for password, run:

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys

Try ssh localhost again.
If it still asks for password, run following and try again:

$ ssh-keygen -t rsa
#Press enter for each line
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
$ chmod og-wx ~/.ssh/authorized_keys 

Step 5

Clean namenode

$ ./hadoop-2.7.3/bin/hdfs namenode -format

Step 6 * Not provided in Hadoop Documentation 

Replace ${JAVA_HOME} with hardcoded path in hadoop-env.sh

gedit hadoop-2.7.3/etc/hadoop/hadoop-env.sh

Edit the file as 

# The java implementation to use.
export JAVA_HOME={path}/jdk1.8.0_111

Step 7

Start Hadoop 

$ ./hadoop-2.7.3/sbin/start-all.sh

The Hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

Browse the web interface for the NameNode;


Step 8

Check processors running by running:

$ jps


xxxxx NameNode
xxxxx ResourceManager
xxxxx DataNode
xxxxx NodeManager
xxxxx SecondaryNameNode

Step 9

Make HDFS directories for MapReduce jobs:

$ ./hadoop-2.7.3/bin/hdfs dfs -mkdir /user
$ ./hadoop-2.7.3/bin/hdfs dfs -mkdir /user/{user-name}

Sunday, March 13, 2016

Sensing the world with Data of Things

Henry Ford once said “Any customer can have a car painted any colour that he wants so long as it is black!” which it’s now long gone. In the current context people seek for personalized treatment. Imagine calling customer service, every time you call you have to go through all the standard questions, and they don’t have a clue why you might be calling? Or whether you called before? and in the case of shopping, even if you are a regular customer and have a platinum or gold membership card you will not get any special treatment at the store, may be presenting the card at the cashier can get you a discount. 

What’s missing here? They don’t know anything about the customer to give a better service. Hence the simple remedy for the above issue is building customer profiles, this can be done with the historical data you might have about the customer, next you need to understand and react to the context the customer evolves such as whether he is in an urgency, has he contacted you before, etc, and finally you have to  react in real time to give the best customer satisfaction. Therefore to provide the best customer satisfaction identifying the context is a key element, and in the present world the best way of identifying the customer context is via the devices your customer has and via the sensors that’s around him which indeed the Internet of Things (IoT)

IoT is not a new thing, we have had lots of M2M systems that have monitored and controlled devices in the past, and when it comes to IoT we have more devices having sensors and a single device having more sensors. IoT it’s an ecosystem where IoT devices should be manufactured, app for those devices should be developed (e.g apps for phone), users 
should be using those devices and finally they should be monitored and managed. WSO2’s IoT Platform plays a key role in managing and providing analytics for the IoT devices in the ecosystem.

Data Types in IoT Analytics

Data from IoT devices are time bound because these devices do continuous monitoring and reporting. With this we can do time series processing such as energy consumption over time. OpenTSDB is a specialised DB implemented for time based processing.

Further since IoT devices are deployed in various geological locations and since some of those devices move, location is also becomes another important data type for IoT Devices. IoT devices are usually tacked with GPS and currently iBeacons are used when the devices are  within a building. Location based data enables geospatial processing such as traffic planning and better route suggestion for vehicles. Geospatially optimised processing engines such as GeoTrellis are especially developed for these type of usecases.

IoT is Distributed

Since IoT is distributed by nature, components of the IoT network constantly get added and removed. Further since IoT devices get connected to the IoT network through all type of communication networks such as from weak 3G networks to Ad-hoc peer-to-peer networks, and they also use various communication protocols such as Message Queuing Telemetry Transport (MQTT), Common Open Source Publishing Platform (CoApp) and ZigBee or Bluetooth low energy (BLE). Due to these the data flow of the IoT network continuously get modified and repurposed. As data load varies dynamically in the IoT network, on-premise deployment will not be suitable, and hence we have to move towards public or hybrid cloud based deployment. IoT has an event driven architecture to accommodate its distributed nature where its sensors reports data as continuous event streams working in an asynchronous manner.

Analytics for IoT

IoT usually produces perishable data where their value drastically degrades over time. This imposes the importance of Realtime Analytics in IoT. With Realtime Analytics temporal patterns, logical patterns, KPIs and thresholds can be detected and immediately alerted to respective stakeholders, such as alarming when temperature sensor hits a limit and notifying  via car dashboard if the tire pressure is low. Systems such as Apache Storm, Google Cloud DataFlow & WSO2 CEP are build for implementing such Realtime Analytics usecases.

Realtime alone is not enough! We should be able to understand how current situation deviates from the usual behaviour, to do so we have to process historical data. With Batch Analytics, periodic summarisation and analytics can be performed on historic data against which we can compare at realtime. Average temperature in a room last month, and total power usage of the factory last year are some example summarizations that can be done using systems like Apache Hadoop & Apache Spark on the data stored in scalable databases such as Apache Cassandra and Apache HBase.

Ok, with Batch Analytics we defined the thresholds and with Realtime Analytics we detected and alerted threshold violations. Notifying violations may results in preventing disasters but it does not help stopping similar issues arising again. To do so we need to investigate the historical data and identify the root course of the issue and eliminate that. This can be done through Interactive Analytics and with it’s Ad-Hoc Queries, it enables us to search the data set how system and all related entities have behaved before the alert was raised. Apache Drill, Apache Lucene and indexed storage systems such as Couchbase are some systems provides Interactive Analytics.

Than being reactive, staying a step ahead predicting issues & opportunities bring great value. This can be achieved through Predictive Analytics which helps in scenarios such as proactive maintenance, fraud detection and health warnings. Systems such as Apache Mahout, Apache Spark MLlib, Microsoft Azure Machine Learning, WSO2 ML & Skytree are systems that can help us build Predictive Analytics models.

An Integrated Solution for IoT Analytics

From the about technologies by selecting WSO2 Siddhi, Apache Storm, Apache Spark, Apache Lucene, Apache HBase, Apache Spark MLLib and with many other open source softwares WSO2 has built and integrated Data Analytics solutions that support Realtime, Batch, Interactive and Predictive analytics solution called WSO2 Data Analytics Server.

Issues in IoT Analytics

Extreme Load

With compared with the scale of the data produced by sensors, distributed centralised analytic platforms cannot scale and even if they can - it will not be cost effective. Hence we should look at whether we need to process and/or store all the data produced by the sensors? In most cases we only need the aggregations over time, trends that exceed thresholds, outliers, event matching a rare condition, and when the system is unstable or changing. For example from a temperature sensor we only need to send readings when there is a change in temperature and its no point periodically sending same value. This directs us to optimise sensors or data collection points to focus on doing local optimisations before publishing data. This helps in quick detection of issues as part of the data is already processed locally and instant notifications since decisions are also taken at edges. Taking decision at the edge can be implemented with the help of complex event processing libraries such as WSO2 Siddhi and Esper.


Due to the distributed nature of IoT, data produced can be duplicated, arrives out of order, missing or even be wrong.

Redundant sensors & network latency can introduce duplicated events and out of order event arrival. This can impose difficulty doing temporal event processing, such as Time Windows & Pattern Matching. These are very useful for usecases such as Fraud detection, and Realtime Soccer Analytics (based on DEBS 2013 dataset) https://goo.gl/c2gPrQ where we build a system that monitors the soccer players and the ball and identified ball kicks, ball possession, shot on goal & offside. Algorithms based on K-Slack can help to order events before processing them on realtime.

Due to network outages data produced by the IoT sensors can go missing, and in these situations using complimenting sensor reading are very important where one of those sensor value will be some sort of an aggregation done at the edge which will help us to approximate the missing sensor values based on its aggregation. Such as publishing Load and Work reading when monitoring electricity where when some events are missed, from a later Work Event reading we will be able to approximate the Load reading that should have arrived during the outage. The other alternative is using fault-tolerant data streams such as Google Millwheel.

Further at times sensor reading won't be correct, this can be due to various reasons such as sensor quality and environment noise, in such situations we can use kalman filtering to smoothen consecutive sensor readings for better approximation. These type of issues are quite common when we use iBeacons for location sensing.

Visualisation of IoT data

Visualisation is one of the most important aspect of effective analytics and hence with Big Data and IoT visualisation becomes even complicated. Per-device & Summarization Views are essential and more than that users should be able to visualize as device groups based on various categories such as device type, location, device owner types, deployed zone and many more. Since these categories are dynamic and each person monitoring the system have various personal preferences, therefore composable & customisable dashboard is essential. Further charts and graphs should be able to visualise the huge stored data, where sampling & indexing techniques can be used for better responsiveness.  

Communicating with devices

In IoT sending a command/alert to a devices is complicated, to do so we have to use client side pooling based techniques. Here we store the data the need to be pushed to the client in a database or queue and expose them via secured APIs (through systems like WSO2 API Manager).

Reference Architecture for IoT Analytics

Here data is collected through message broker such as MQTT, immediately written to the disk by WSO2 Data Analytics Server (DAS), at the meantime the collected data is cleaned in realtime, this cleaned data is also persisted, and parallely the cleaned data is fed into realtime event processing which in deed sends alerts and provides realtime visualisations. Stored clean data is used by WSO2 Machine Learner (ML) to build machine learning models and deploy them at WSO2 DAS for realtime predict ions. Further the stored clean data is also used by Spark to run Batch analytics producing sumarisation, which are then visualised in dashboards.

It’s a pleasure for me presenting “Sensing the world with Data of Things” at Structure Data 2016, San Francisco. Please find the slides below. 

Sunday, February 28, 2016

WSO2 Complex Event Processor 4.1

WSO2 Complex Event Processor  4.1 released  23rd February 2016.

This release mainly focuses on improving and stabilizing the product and enhancing its capabilities.

One of the main features that was included in this release is instrumenting and monitoring support for WSO2CEP  as well as Siddhi.  This enable users to identify throughput and memory consumption of each and every component of WSO2CEP and Siddhi. Through this users can identify possible bottlenecks in their queries and optimize CEP for better performance.

The same is also exposed via JMX  such that It can be monitored to third party JMX consumers  such as jconsole.

Another important feature CEP introduced in this release is visualizing Siddhi queries.

One of the notable improvement of this release is its improved high-availability support. now CEP can support high availability with more than two nodes providing zero downtime with no data loss, and as it also stores its state as a periodic snapshot to a database even during full cluster restart CEP has the capability of restoring it state from its last available snapshot. For more information refer the documentation here.

Further WSO2CEP has introduced several improvements to it’s core runtime complex event processor engine Siddhi. They are as follows:

  • Hazelcast Event Table - Allowing events to be stored and  manipulated in  Hazelcast based In-Memory Data Grid.
  • Minima and Maxima detection - Detecting maxima and minima in an event pattern, this allows detecting complex  stock market patterns using combinations of maxima and minima.
  • Map extension -  This is introduced to support arbitrary key value pairs in Siddhi which was not supported for a long time. This function allows users to create a map, add remove and check for keys and values within a hash map.
  • InsertOrUpdate to Event Table -  This facilitate doing and insert or an update as an atomic operation on Even Tables.
  • Outer and left joins in Siddhi - In addition to inner joins Siddhi now supports outer and left joins
  • Time length window - The eviction policy on Siddhi window is triggered both by time and length properties.
  • External Batch window -  Allowing batch window to get triggered by event time rather by getting triggered by the system time.

You can download WSO2 Complex Event processor 4.1 from here, its documentation from here,  and find the latest Siddhi documentation here.

Tuesday, January 12, 2016

Adding Statistics to Siddhi 3

A new statistics feature is added to Siddhi 3.0.5.

This will help users to find how their queries are behaving and to understand what query to optimise to get better performance and to optimise memory consumption.

This is very easy to enable and you can enable this by just adding

"@plan:statistics(reporter = 'console', interval = '5' )"

to the execution plan will start statistics reporting on console every 5 seconds.

You will be able to and output something like
1/12/16 12:01:39 AM ============================================================

-- Gauges ----------------------------------------------------------------------
             value = 548804
             value = 556686

-- Meters ----------------------------------------------------------------------
             count = 2
         mean rate = 0.37 events/second
     1-minute rate = 0.40 events/second
     5-minute rate = 0.40 events/second
    15-minute rate = 0.40 events/second
             count = 0
         mean rate = 0.00 events/second
     1-minute rate = 0.00 events/second
     5-minute rate = 0.00 events/second
    15-minute rate = 0.00 events/second
             count = 3
         mean rate = 0.57 events/second
     1-minute rate = 0.60 events/second
     5-minute rate = 0.60 events/second
    15-minute rate = 0.60 events/second

-- Timers ----------------------------------------------------------------------
             count = 2
         mean rate = 0.38 calls/second
     1-minute rate = 0.40 calls/second
     5-minute rate = 0.40 calls/second
    15-minute rate = 0.40 calls/second
               min = 0.01 milliseconds
               max = 0.08 milliseconds
              mean = 0.05 milliseconds
            stddev = 0.04 milliseconds
            median = 0.08 milliseconds
              75% <= 0.08 milliseconds
              95% <= 0.08 milliseconds
              98% <= 0.08 milliseconds
              99% <= 0.08 milliseconds
            99.9% <= 0.08 milliseconds
             count = 2
         mean rate = 0.38 calls/second
     1-minute rate = 0.40 calls/second
     5-minute rate = 0.40 calls/second
    15-minute rate = 0.40 calls/second
               min = 0.01 milliseconds
               max = 0.01 milliseconds
              mean = 0.01 milliseconds
            stddev = 0.00 milliseconds
            median = 0.01 milliseconds
              75% <= 0.01 milliseconds
              95% <= 0.01 milliseconds
              98% <= 0.01 milliseconds
              99% <= 0.01 milliseconds
            99.9% <= 0.01 milliseconds

You can also add 

"@plan:statistics(reporter = 'jmx')"

to report stats via jmx.

You can find a sample implementations here.

Siddhi also allows users to add custom reporting tools. To do so.
Just implement the StatisticsTrackerFactory interface

package org.wso2.siddhi.core.util.statistics;
import org.wso2.siddhi.query.api.annotation.Element;
import java.util.List;

public interface StatisticsTrackerFactory {
    LatencyTracker createLatencyTracker(String name, StatisticsManager statisticsManager);
    ThroughputTracker createThroughputTracker(String name, StatisticsManager statisticsManager);
    MemoryUsageTracker createMemoryUsageTracker(StatisticsManager statisticsManager);
    StatisticsManager createStatisticsManager(List<Element> elements);

And add that to the Siddhi Manager
siddhiManager.setStatisticsConfiguration(new StatisticsConfiguration(new MyMetricsFactory()));

Happy Coding :)