Big data is a term used to refer data that is so large and complex that it cannot be stored or processed using traditional database software.
Big data analytics is the process of analyzing big data to discover useful information.
Apache Hadoop is an open source framework that allows us to store and process big data in parallel and distributed fashion on computer clusters.
Hadoop has three main components.
Hadoop Distributed File System (HDFS – storage)
It allows us to store any kind of data across the cluster.
Map / Reduce (Processing)
It allows parallel processing of the data stored in HDFS.
YARN
It is the resource management and job scheduling technology for distributed processing.
Hadoop is written originally to support Map / Reduce programming in Java language but it allows writing Map/Reduce programs in any language using an API called Hadoop Streaming. Let’s test some hadoop Map / Reduce programs on the Data Science Unit (DSU) cluster.
It reads a text file and counts how often words occur.
Let’s run a wordcount.java program using the DSU-Hadoop Cluster
Steps to run the program are given below:
Get a text file (sample_text.txt) and save it on the Desktop
Put the file in the HDFS.
dsu@master: ~$hadoop fs –put /home/dsu/Desktop/sample_text.txt /user/WordCount
Here, /user/WordCount is the directory in HDFS where your file will be stored.
Create a jar file for wordcount.java and save it on the Desktop
There are several ways to create a jar file (E.g. Using Eclipse). You may find details online for creating a jar file.
Run the program
dsu@master:~$hadoop jar Desktop/wordcount.jar /user/sample_text.txt /user/WordCount
The output will be stored inside “/user/WordCount” directory.
View the output
dsu@master:~$hadoop fs –cat /WordCount/part-r-00000
By default, Hadoop names the output file as part-r-00000.
The word count program for java is given below.
package wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path; public class WordCount { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { value.set(tokenizer.nextToken()); context.write(value, new IntWritable(1)); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable x: values) { sum += x.get(); } context.write(key, new IntWritable(sum)); } } public static void main (String [] args) throws Exception { //Reads the default configuration of cluster from the configuration xml files Configuration conf = new Configuration(); //Initializing the job with the default configuration of the cluster Job job = Job.getInstance(conf, "WordCount"); //Assigning the driver class name job.setJarByClass(WordCount.class); //Defining the mapper class name job.setMapperClass(Map.class); //Defining the reducer class name job.setReducerClass(Reduce.class); //Key type coming out of mapper job.setMapOutputKeyClass(Text.class); //value type coming out of mapper job.setMapOutputValueClass(IntWritable.class); //Defining input Format class which is responsible to parse the dataset into a key value pair job.setInputFormatClass(TextInputFormat.class); //Defining output Format class which is responsible to parse the dataset into a key value pair job.setOutputFormatClass(TextOutputFormat.class); //Setting the second argument as a path in a path variable Path OutputPath = new Path(args[1]); //Configuring the input path from the file system into the job FileInputFormat.addInputPath(job, new Path(args[0])); //Configuring the output path from the file system into the job FileOutputFormat.setOutputPath(job, new Path(args[1])); //Deleting the context path automatically from hdfs so that we don't have to delete it explicitly OutputPath.getFileSystem(conf).delete(OutputPath, true); //Exiting the job only if the flag value becomes false System.exit(job.waitForCompletion(true) ? 0:1); }
Hadoop Streaming is a utility that comes with the Hadoop distribution. It allows us to write Map / Reduce programs in any language
Let’s code the word count program in Python and run it using Hadoop Streaming.
Mapper.py
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print '%s\t%s' % (word,1)
Reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print '%s\t%s' % (current_word, current_count) current_count = count current_word = word if current_word == word: print '%s\t%s' % (current_word, current_count)
In the previous example, we uploaded the sample text file to the HDFS. Save the mapper.py and reducer.py in a folder (E.g., /home/dsu/Desktop/mapper.py, /home/dsu/Desktop/reducer.py).
Run the program as follows.
dsu@master:~$ hadoop jar /home/dsu/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar –file /home/dsu/Desktop/mapper.py –mapper mapper.py –file /home/dsu/Desktop/reducer.py –reducer reducer.py –input /user/WordCount/sample_text.txt –output /user/WordCount/wordcount_py
The output will be stored as part-0000 by default inside /user/WordCount/wordcount_python directory in HDFS.
You can view the output as follows.
dsu@master:~$hadoop fs –cat /user/WordCount/wordcount_python/part-00000
Even though Hadoop streaming is a standard way to code map-reduce programs in languages other than Java, some other APIs such as Mrjob, Hadoopy and Pydoop also support Python map-reduce programming as well.
In this section, we will code the above “word count” program in R using Hadoop Streaming. You also can use an API called RHadoop to code map-reduce programs in R.
Mapper.R
#! /usr/bin/env Rscript # mapper.R - Wordcount program in R # script for Mapper (R-Hadoop integration) trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+")) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) words <- splitIntoWords(line) for (w in words) cat(w, "\t1\n", sep="") } close(con)
Reducer.R
#! /usr/bin/env Rscript # reducer.R - Wordcount program in R # script for Reducer (R-Hadoop integration) trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) splitLine <- function(line) { val <- unlist(strsplit(line, "\t")) list(word = val[1], count = as.integer(val[2])) } env <- new.env(hash = TRUE) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) split <- splitLine(line) word <- split$word count <- split$count if (exists(word, envir = env, inherits = FALSE)) { oldcount <- get(word, envir = env) assign(word, oldcount + count, envir = env) } else assign(word, count, envir = env) } close(con) for (w in ls(env, all = TRUE)) cat(w, "\t", get(w, envir = env), "\n", sep = "")
Run the code as follows.
dsu@master:~$ hadoop jar /home/dsu/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar –file /home/dsu/Desktop/mapper.R –mapper mapper.R –file /home/dsu/Desktop/reducer.R –reducer reducer.R –input /user/WordCount/sample_text.txt –output /user/WordCount/wordcount_R
You can view the output as we explained earlier.
The Hadoop ecosystem includes other tools to address particular needs. As of now, DSU Cluster accommodates Apache Pig, Apache Hive and Apache Flume.
Apache Pig provides an alternative way to write map-reduce programs in a more simplified way using a language called pig-latin. Non- programmers can easily learn this language and code map-reduce programs in pig-latin. Two hundred lines of Java code can be replaced by 10 lines of pig-latin scripts. Pig includes inbuilt operations such as join, group, filter, sort and more so that we don’t have to write code for those operations. Pig sits on the top of Hadoop and uses Pig Engine to convert pig-latin scripts into map-reduce jobs before execution
Let’s code the “word count” program in Pig (word_count.pig).
lines = LOAD '/input_file.txt' AS (line:chararray); words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word; grouped = GROUP words BY word; wordcount = FOREACH grouped GENERATE group, COUNT(words); DUMP wordcount;
Command to execute it.
dsu@master:~$pig word_count.pig
The output will be displayed on the terminal.
Apache Hive is a data warehousing package built on top of hadoop for data analysis. Hive is targeted towards users comfortable with Structured Query Language (SQL). Hive uses a language similar to SQL called HiveQL. Hive abstracts the complexity of Hadoop so users do not need to write map-reduce programs. Hive queries will be automatically converted into map-reduce jobs.
SQL is designed for tables that reside in a single machine. In HDFS, table data is distributed across multiple machines. HiveQL is used to analyze data in distributed storage.
Hive is read-based and therefore not applicable for transaction processing that typically involves a higher percentage of write operations (e.g., ATM transactions). It is also not suitable for applications that need very fast response times as hadoop is intended for long sequential scans.
The following Demos use Pig and Hive.
Load data from Local File System to HDFS and HDFS to Hive Table.
Step 1: Creating a table in Hive
Step 2: Copying the data from local file system to HDFS
Step 3: Loading the data from HDFS to Hive table
These steps are demonstrated in detail.
(Inside Hive Shell)
hive> create database hive_demo;
List down all the available databases.
hive> show databases;
Use the newly created database
hive> use hive_demo;
Create a table inside the newly created database (hive_demo).
hive> create table user_details ( > user_id int, > name string, > age int, > country string, > gender string > ) row format delimited field terminated by ',' > stored as textfile;
Check whether the table is created using the following commands.
hive> show tables;
hive> desc user_details;
hadoop fs -put /home/dsu/Desktop/dsu_review/userdata /
hive> load data inpath '/userdata' into table user_details;
Now we can analyse the data using Hive Query Language.
hive> select * from user_details;
hive> select name from user_details where age >= 18;
Our task is to list down the top 3 websites that are viewed by the people under16.
User Details (table_name = "user_details") | Website Visited (table name = clickstream) |
---|---|
1,Dilan,20,SL,M | 1,www.bbc.com |
2,Nisha,15,SL,F | 1,www.facebook.com |
3,Vani,14,SL,F | 1,www.gmail.com |
4,Steno,12,SL,F | 2,www.cnn.com |
5,Ajith,17,SL,M | 2,www.facebook.com |
2,www.gmail.com | |
2,www.stackoverflow.com | |
2,www.bbc.com | |
3,www.facebook.com | |
3,www.stackoverflow.com | |
3,www.gmail.com | |
3,www.cnn.com | |
4,www.facebook.com | |
4,www.abc.com | |
4,www.stackoverflow.com | |
5,www.gmail.com | |
5,www.stackoverflow.com | |
5,www.facebook.com |
Here, there are 2 tables (user_details & clickstream).
Clickstream table lists down the websites viewed by each user.
User_details table is created in the previous demo.
We have to create clickstream table.
Please follow the steps.
Here, there are 2 tables (user_details & clickstream).
Clickstream table lists down the websites viewed by each user.
User_details table is created in the previous demo.
We have to create clickstream table.
Please follow the steps.
hive> create table clickstream (userid int, url string) > row format delimited fields terminated by ',' > stored as textfile; hive> show tables; > desc clickstream;
$hadoop fs -put /home/dsu/Desktop/dsu_review/clickstream /
hive> load data inpath '/clickstream' into table clickstream; hive> select * from clickstream;
Task 1: Analyze the data using Hive
hive> select url, count(url) c from user_details u JOIN clickstream c ON (u.user_id=c.userid) where u.age<16 group by url order by c DESC limit 3;
Answer:
www.stackoverflow.com 3 www.facebook.com 3 www.gmail.com 2
Task 2: Analyze the data using Apache Pig
Load user_details file to HDFS.
$hadoop fs -put /home/dsu/Desktop/dsu_review/userdata /
$hadoop fs -put /home/dsu/Desktop/dsu_review/clickstream /
Program (pig_demo.pig)
users = load '/userdata' using PigStorage(',') as (user_id, name, age:int, country, gender); filtered = filter users by age <= 16; pages = load '/clickstream' using PigStorage(',') as (userid, url); joined = join filtered by user_id, pages by userid; grouped = group joined by url; summed = foreach grouped generate group , COUNT(joined) as clicks; sorted = order summed by clicks desc; top3 = limit sorted 3; dump top3;
Command to execute it.
$cd /home/dsu/Desktop/dsu_review
$pig pig_demo.pig
Answer:
(www.facebook.com,3) (www.stackoverflow.com,3) (www.gmail.com,2)
Apache Flume is an ecosystem tool used for streaming the log files from applications into HDFS. E.g. downloading tweets and storing them in HDFS
Steps in brief:
Step 1: Create a Twitter application from apps.twitter.com.
Step 2: Get the following credentials.
Consumer key
Consumer key (secret)
Access Token
Access Token Secret
Step 3: Prepare a configuration file to download tweets
Provide all the necessary details and the type of tweets that you want to download.
Provide all the necessary details and the type of tweets that you want to download.
Step 4: Run the configuration file
flume/bin$ ./flume-ng agent -n TwitterAgent -c conf -f ../conf/twitter.conf -Dtwitter4j.http.proxyHost=cachex.pdn.ac.lk -Dtwitter4j.http.proxyPort=3128
It will download tweets and store it in HDFS.
SQL vs NoSQL Databases
SQL - Structured Query Language
NOSQL - Not Only SQL
Relational Database Management Systems (RDBMS) use SQL. Simple SQL queries (e.g. select * from [table name] are used to retieve data. RDBMS do not incorporate distributed storage (store data across multiple computers).
NoSQL databases address distributed storage. They do not have the SQL interface.
HBase is a NoSQL database built on top of Hadoop Distributed File System (HDFS). Data is stored as key, value pairs. Both key and value are stored in byte arrays
HBASE | RDBMS |
---|---|
Column-oriented | Row-oriented (mostly) |
Flexible Schema, add columns dynamically | Fixed Schema |
Good with sparse tables | Not optimized for sparse tables (too many null values) |
Joins using Mapreduce – not optimized | Optimized for joins |
Leverages batch processing with Mapreduce distributed processing | No |
Good for semi-structured data as well as structured data | Good for structured data |
Scales linearly and automatically with new nodes | Usually scales vertically by adding more hardware resources |
HBASE | HIVE |
---|---|
Well suited for CRUD (Create, Read, Update, Delete) | Not well suited for CRUD |
Maintain versions | Not supported |
Less support for aggregations. E.g. find max/min/avg of a column | Good support for aggregations |
No table joins | Supports table joins |
Look up is very fast so that read activity is very fast | Not fast |
HBase has a Java API. It is the only first class citizen. There are other programmatic interfaces (e.g. REST API) as well.
HBase does not have an inbuilt SQL interface. There are non-native SQL interfaces available for HBase (e.g. Apache Phoenix, Impala, Presto, & Hive).
We are going to demonstrate how to create a table, store, update, retrieve & delete data, and drop a table using both of the above mentioned ways.
The following table will be used for the demonstration.
Employee (Table Name) | ||||
Personal (column family) | Professional (column family) | |||
eid | name | gender | experience | salary |
'eid01' | 'Rahul' | 'male' | 5 | 80000 |
'eid02' | 'Priya' | 'female' | 2 | 50000 |
Column-family is used to group set of columns together. Column-family is not available in RDBMS.
You need to connect to the DSU master node and type ‘hbase shell’ in a terminal. It will open the hbase shell.
dsu@master:~$ hbase shell
hbase(main)>
When creating a table, we only need to provide table-name and column-families. Column names are not required.
hbase(main)>create ‘table-name’, ‘column-family-1’, ‘column-family-2’,....’column-family-n’
hbase(main)> create 'employee', 'personal', 'professional'
To check whether the table is created, use ‘list’ command. ‘List’ command will display all the tables that are present in HBase.
hbase(main)>list
‘Put’ command is used to insert data.
Column names are specified when values are inserted.
Hbase(main)> put ‘table-name’, ‘row-id’, ‘column-family:column-name’, ‘value’
hbase(main)> put 'employee', 'eid01', 'personal:name', 'Rahul'
hbase(main)> put 'employee', 'eid01', 'personal:gender', 'male'
hbase(main)> put 'employee', 'eid01', 'professional:experience', '5'
hbase(main)> put 'employee', 'eid01', 'professional:salary', '80000'
hbase(main)> put 'employee', 'eid02', 'personal:name', 'Priya’'
hbase(main)> put 'employee', 'eid02', 'personal:gender', 'female'
hbase(main)> put 'employee', 'eid02', 'professional:experience', '2'
hbase(main)> put 'employee', 'eid02', 'professional:salary', '50000'
Hbase(main)> scan ‘table-name’
hbase(main)> scan ‘employee’
Get all the records of ‘eid01’
hbase(main)> get 'employee', 'eid01', 'personal'
Get the personal details of ‘eid01’
hbase(main)> get 'employee', 'eid01', 'personal'
Get the name of ‘eid01’
hbase(main)> get ‘employee’, ‘eid01’, ‘personal:name’
‘Put’ command performs something called as upsert (update or insert).
If row_id exists, content will be updated.
If the row_id does not exist, it will be inserted.
Let’s make ‘Rahul’ to ‘Rajiv’ and run the code. It will be updated.
hbase(main)> put 'employee', 'eid01', 'personal:name', 'Rajiv'
Delete a particular column from a column family.
hbase(main)> delete ‘employee’, ‘eid01’, ‘personal:gender’
Disable the table first and then delete it.
hbase(main)> disable ‘employee’
hbase(main)> drop ‘employee’
Now, we will do the above operations using Java API.
The demonstration is illustrated using Eclipse – an integrated development environment for Java.
Eclipse is installed in DSU/HBase master node.
Please follow these steps carefully.
Step 1: Create a new Java project in Eclipse, name it as you wish and click ‘Finish’.
Step 2: Add the External Jars from /dsu/home/hbase-1.48/lib.
To do that, Right click the folder you created → ‘Build Path’ → ‘Configure Build Path’ → ‘Libraries’ → Add External jars → select all → apply and close.
These jars contain the hbase classes.
Some classes and their functionalities are listed below.
Class | Functionalities |
---|---|
HBaseAdmin | Create table, checks if table exists, disable a specific table, drop a table |
HTableDescriptor | Responsible for handling tables |
HColumnDescriptor | Handles column families |
HBaseConfiguration | Identifies the HBase configurations |
HTable | this class is responsible for interacting with any HBase table or DML operations. It has several methods – get, put, delete |
Now you are ready to write HBase programs.
Create a java class ‘HBaseDDL.java’ inside src. You can use whatever class name you want.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; public class HBaseDDL { public static void main(String[] args) throws IOException{ // create a configuration object // it will store all the default configuration of hbase Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor des = new HTableDescriptor(Bytes.toBytes("employee")); des.addFamily(new HColumnDescriptor("personal")); des.addFamily(new HColumnDescriptor("professional")); if(admin.tableExists("employee")) { System.out.println("Table Already exists!"); admin.disableTable("employee"); admin.deleteTable("employee"); System.out.println("Table: employee deleted"); } admin.createTable(des); System.out.println("Table: employee successfully created"); } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class PutDemo { public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); try { HTable table = new HTable(conf, "employee"); Put put = new Put(Bytes.toBytes("eid01")); put.add(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("Rahul")); put.add(Bytes.toBytes("professional"), Bytes.toBytes("exp"), Bytes.toBytes("4")); table.put(put); System.out.println("inserted record eid01 to table employee ok."); put = new Put(Bytes.toBytes("eid02")); put.add(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("Suraj")); put.add(Bytes.toBytes("professional"), Bytes.toBytes("exp"), Bytes.toBytes("2")); table.put(put); System.out.println("inserted record eid02 to table employee ok."); } catch (IOException e) { e.printStackTrace(); } } }
In the about PutDemo class, change the name of ‘eid01’ from ‘Rahul’ to ‘Rajiv’ and rerun it again. It will be updated.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; public class GetDemo { public static void main(String [] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "employee"); Get get = new Get(Bytes.toBytes("eid01")); Result rs = table.get(get); for(KeyValue kv: rs.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; public class DeleteDemo { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "employee"); Delete del = new Delete(Bytes.toBytes("eid01")); table.delete(del); System.out.println("Row eid01 deleted"); } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; public class ScanDemo { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "employee"); Scan sc = new Scan(); ResultScanner rs = table.getScanner(sc); System.out.println("Get all records\n"); for(Result r:rs) { for(KeyValue kv: r.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } } } }
Apache Sqoop is a tool in Hadoop ecosystem that is designed for effectively transferring data between Hadoop and Relational Database Management Systems (RDBMS) such as MySQL, Microsoft SQL Server, Oracle DB, and others.
Sqoop addresses the difficulty of writing mapreduce programs to transfer data between Hadoop and RDBMS. When a sqoop command is executed, sqoop internally generates mapreduce code.
SQOOP = [SQ]L + HAD[OOP]
MySQL database is used for the following demonstration. An overview of the demonstration is listed below.
MySQL is installed in the master node of DSU Hadoop Cluster. Once you are connected to the cluster, you need to log in to the MySQL server.
dsu@master:~$mysql –u root –p
It will prompt you to enter the password. The MySQL credentials will be issued when you create an account at DSU.
Mysql>
Now, you are logged into MySQL.
Let’s create a database.
Mysql>create database faculty;
List down the databases that are present in MySQL server.
Mysql> show databases;
Switch to the newly created database.
Mysql>use faculty;
Let’s create a table called ‘student’ and populate values as given in below.
Student | ||||
student_id | name | age | gender | hometown |
1001 | Dilshan | 21 | Male | Colombo |
1002 | Ajith | 23 | Male | Mannar |
1003 | Dulani | 20 | Female | Kandy |
1004 | Samantha | 24 | Female | Kandy |
1005 | Rohit | 22 | Male | Galle |
Mysql> create table student (
student_id int not null,
name varchar(50) not null,
age int,
gender varchar(50),
hometown varchar(50));
To check whether the table is created properly:
Mysql> show tables;
It will display all the tables available inside the database ‘faculty’.
Mysql>describe student;
It will describe the schema of the table ‘student’.
Let’s insert the values to the table.
Mysql> insert into student (student_id, name, age, gender, hometown) values (1001, 'Dilshan', 21, 'male', 'Colombo');
Mysql> insert into student (student_id, name, age, gender, hometown) values (1002, 'Ajith', 23, 'male', 'Mannar');
Mysql> insert into student (student_id, name, age, gender, hometown) values (1003, 'Dulani', 20, 'female', 'Kandy');
Mysql> insert into student (student_id, name, age, gender, hometown) values (1004, 'Samantha', 24, 'female', 'Kandy');
Mysql> insert into student (student_id, name, age, gender, hometown) values (1005, 'Rohit', 22, 'male', 'Mannar');
Let’s say you have mistakenly typed ‘Dilan’ instead of ‘Dilshan’. We can modify it as follows.
Mysql> update student set name = 'Rohit' where student_id=1005;
Now the table is created inside MySQL. To check whether all the values are inserted properly:
Mysql> select * from student;
We have seen how to display the available databases using MySQL command line interface (CLI) using “show databases;”.
Let’s connect to MySQL via SQOOP and list down the databases in Sqoop’s CLI.
Please refer the following official user guide from Apache Sqoop for more details about the notations that will be used throughout the demonstration.
http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html
First, check whether MySQL server is running in the local (master) machine.
dsu@master:~$service mysql status
If it is active, you will get “active (running)” message in green color. If not please contact the DSU.
Press ‘q’ to exit the message.
List down all the available databases inside the MySQL via Sqoop
To do that we have to make a connection to the MySQL server from Sqoop using JDBC driver. The complete command is given below.
dsu@master:~$ sqoop list-databases --connect jdbc:mysql://localhost --username root –P
It will ask you to enter password.
After the password is given, Sqoop will convert this command into mapreduce program and run it in the background and display all the available databases.
List down all the available tables inside a particular database via Sqoop
dsu@master:~$ sqoop list-tables --connect jdbc:mysql://localhost/faculty --username root -P
It will display all the tables that are present inside the ‘faculty’ database. It will display only ‘student’ table as we have not created other tables.
Run simple SQL queries via Sqoop CLI
Display all the records of student table:
dsu@master:~$ sqoop eval --connect jdbc:mysql://localhost/faculty --username root -P --query "select * from faculty.student"
Display selective records (display students older than 23 years)
dsu@master:~$ sqoop eval --connect jdbc:mysql://localhost/faculty --username root -P --query "select * from faculty.student where faculty.student.age >= 23"
So far, we have analyzed data that are present in MySQL server via Sqoop’s CLI. In this section, we will import data from MySQL database into Hadoop cluster. Please note that MySQL server is running in a single machine as a local server and it is not a part of our Hadoop cluster.
Import ‘student’ to table into HDFS
dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student -m 1
The ‘student’ table will be stored in the Hadoop default location “/user/dsu”. We also can provide our desired location where the table should be stored, as follows.
dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student --target-dir /sqooptest
We have already stored ‘student’ table data inside HDFS (/user/dsu/student). Let’s export the table data back to MySQL and store it in a new table called ‘student_new’.
Create a ‘student_new’ table.
Mysql> create table student_new (student_id int not null primary key, name varchar(50) not null, age int, gender varchar(50), hometown varchar(50));
Now we can export the data from HDFS to the newly created table.
dsu@master:~$ sqoop export --connect jdbc:mysql://localhost/faculty --username root -P --table student_new --export-dir /user/dsu/student -m 1
mysql> select * from student_new;
You will see that the new table is populated with the data imported from HDFS.
dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student --target-dir /hive_student --hive-import --create-hive-table
Now, a hive table is populated (hive_student).
Display the content of the newly created hive table
Go to the hive interface and execute the following command.
dsu@master:~$hive
hive>select * from hive_student;
Apache Mahout is a library of scalable machine-learning algorithms that runs on top of Hadoop using the Mapreduce paradigm.
Mahout widely supports the following machine-learning tasks.
Collaborative filtering
Mines user behavior patterns for recommendations (e.g., Amazon product recommendations)
Clustering
Grouping of objects based on their characteristics (e.g. categorizing newspaper articles as politics, sports, entertainment, and so on.
Classification
Classify new observations into a set of existing classes based on a training set of data containing observations whose classes are known (e.g., decide whether a new email is spam).
An example for each of the above use cases are illustrated systematically using the DSU Cluster.
This example recommends movies a user may be interested based on the ratings provided by her on other movies.
We utilize a publicly available dataset called “MovieLens 20M” from the following website.
https://grouplens.org/datasets/movielens/
The MovieLens 20M dataset carries 20 million ratings on 27,000 movies by 138,000 users.
This dataset contains several files. This example needs “u.data” file only.
The structure of the u.data file is as follows.
[user id] [movie id] [rating] [timestamp]
Step 1:
Download the MovieLens 20M dataset and extract it.
Step 2:
Copy u.data into HDFS.
$hadoop fs –put /home/dsu/Desktop/ml-20m/u.data /movielens
Step 3:
Run the item based recommender algorithm on u.data using appropriate parameters and values. You may use different algorithm.
$mahout recommenditembased --input /movielens/u.data --output /movielens/itemrecomds --similarityClassname SIMILARITY_PEARSON_CORRELATION –numRecommendations 5
# recommenditembased – the machine learning algorithm we are using
# --input – location of input file
# --output – location where the output is to be stored
# --similarityClassname – a desired similarity measure that is available in Mahout
# --numRecommendations – number of movies recommended per user
$hadoop fs –cat /movielens/itemrecomds
Format of the output file is as follows.
[user id] [movie_id: predicted rating of that user for that movie, ….]
Here, we group similar types of news articles together. For example, all news articles related to politics are grouped together.
Step 1:
Create multiple text files where each text file contains a news article. Keep them in a single folder and upload this folder to HDFS.
Assume, we have a folder called sl_news in the Desktop where there are six text files. Each text file contains a news article (2 political news, 2 sports news, & 2 entertainment news).
$hadoop fs –put /home/dsu/Desktop/sl_news/* /sl_news/
Step 2:
Convert the text files to sequence files using the command “seqdirectory”. Sequence files are binary files containing key-value pairs.
$mahout seqdirectory –i /sl_news/ -o /sl_news/kmeansseqfiles –ow
# -i input directory
# -o output directory (sequence file)
# -ow If set, overwrite the output directory
Step 3:
Convert sequence files into tf-idf vectors.
$mahout seq2sparse –i /sl_news/kmeansseqfiles –o /slnews/kmeanstfidffiles –ow
#seq2sparse – a library to convert sequence files into tf-idf vectors
Step 4:
Perform classification using ‘kmeans’ library.$mahout kmeans –i /sl_news/kmeanstfidffiles/tfidf-vectors/ -c /sl_news/kmeanscentroids –o /sl_news/kmeansclusters –k 3 –ow –x 50 –dm org.apache.mahout.common.distance.CosineDistanceMeasure
# -c destination directory for centroids
# -k number of clusters
# -x number of iterations
# -dm distance measure
Here, we set k=3 as we have three types of news (politics sports, & entertainment).
$mahout clusterdump –d /sl_news/kmeanstfidffiles/dictionary.file-0 –dt sequencefile –i /sl_news/kmeansclusters/clusters-1-final –n 20 –b 100 –o /home/dsu/Desktop/dump-file.txt –p /sl_news/kmeansclusters/clusteredPoints/
You can view the clusters by typing,
$cat /home/dsu/Desktop/dump-file.txt
In our case, there are three clusters and twenty most common words in each cluster are displayed.
A publicly available dataset is used for this demonstration. The dataset is called “Enron”. It can be downloaded from https://www.cs.cmu.edu/~./enron/.
This dataset has two folders. One is for hams and the other is for spams. Each email is stored as a separate text file inside these folders.
Step 1:
Copy the dataset into HDFS.
$hadoop fs –mkdir /enron
$hadoop fs –mkdir /enron/spam
$hadoop fs –mkdir /enron/ham
$hadoop fs -copyFromLocal /home/dsu/Desktop/enron/ham/* /enron/ham
$hadoop fs -copyFromLocal /home/dsu/Desktop/enron/spam/* /enron/spam
Step 2:
Convert the text data into sequence file format.
$mahout seqdirectory -i /enron -o /enron/nbseqfiles
Step 3:
Convert sequence data into tf-idf vectors.
$mahout seq2sparse -i /enron/nbseqfiles -o /enron/nbsparse
Step 4:
Training and test datasets
# -i files directory (should use tfidf-vectors)
# --trainingOutput ( directory for training data)
# --testOutput (directory for test data)
# --randomSelectPct (percent of data to put in training)
# --overwrite - overwrite current data
# --sequenceFiles (indicating that the files are of sequence form)
# -xm (type of processing. Sequential / mapreduce)
$mahout split -i /enron/nbsparse/tfidf-vectors --trainingOutput /enron/nbTrain --testOutput /enron/nbTest --randomSelectionPct 20 --overwrite --sequenceFiles -xm sequential
Build the NB Model
# -i Training files data
# li - path to store the label index
# -o path to store the model
# -ow overwrite
# -c train complementary naivebayes
$mahout trainnb -i /enron/nbTrain -li /enron/nbLabels -o /enron/nbmodel -ow -c
Step 6:
Testing the model using the testing dataset.
# i - test data directory
# -m model directory
# -l - labels
# -ow overwrite
# -o predictions directory
# -c complementary naive bayes
$mahout trainnb -i /enron/nbTrain -li /enron/nbLabels -o /enron/nbmodel -ow -c
Step 6:
Testing the model using the testing dataset.
# i - test data directory
# -m model directory
# -l - labels
# -ow overwrite
# -o predictions directory
# -c complementary naive bayes
$mahout testnb -i /enron/nbTest -m /enron/nbmodel -l /enron/nbLabels -ow -o /enron/nbpredictions –c
The performance metrics of this model is displayed on the terminal.
DSU is working on adding more tools from Hadoop Ecosystem in the near future. Please contact us for further details.