MapReduce Thought Experiment

MapReduce: What is it?

As a way to explain what MapReduce is and how it introduces parallelism into what was once a single-threaded task, I’m going to use the very simple and admittedly scoff-ably inane example of counting how many times each word in a book occurs in that book (or collection of books). While this is an impractical use of the technology, the point here is not to show we can count, but rather to try and demonstrate how we can take something that is single threaded and turn it into a parallelized process with a few optimizations tossed in.

Before we apply any technology to the matter, let’s first approach this as a thought experiment. At it’s very basic level, the map part of MapReduce takes the data that is to be processed and breaks it into tuples; key – value pairs. The reduce part takes the output from map as it’s input and combines the mapped tuples into a smaller set of tuples in some way (thus reducing the output). Were I to take the book we’re going to process and tear it into two roughly equal parts, give you half and keep half, and ask you to count the words in your half while I count the words in my half, that would be ‘map’. When we’re done with our respective counting and we take our results to consolidate them into a single list of words and counts, that’s ‘reduce’.

I estimate that an average person could count the number of occurrences of each word of one page of a decent sized book (let’s imagine it’s something like ‘War and Peace’) in somewhere between 15 and 20 minutes. We’ll go down the middle and say 18 minutes. If the book in question is 1,000 pages long that puts it at 18,000 minutes, or 300 hours, for one person to count the number of time each word occurs. As we discussed, if we split the book in two, we can then, in theory, get our job done in half the time- but wait -that’s only for the ‘map’ part. When we split the book into two parts, we also created the need to put the two result sets together at the end. We’ve gained the efficiency of parallelizing our task, but with the cost of adding a ‘reduce’ task to the end of our work – still, it’s got to be faster than doing the job alone.

The Map

Let’s go a step further and now define how each of us is going to count the words on a given page. As discussed, ‘map’ takes the input and breaks it into tuples to pass on to ‘reduce’. So, in this case let’s define map to be taking each word as it occurs on a page and make that the key to a tuple with the value being simply a ‘tick’, or ’1′, to represent this word has occurred once (or one more time). I’ll guess this changes how quickly an individual can ‘map’ a page. There’s no adding to be done, no lookup to find where the tally of a given word might have previously been recorded, or initializing a tally if found this is the first time we’ve come across a particular word. It’s simply, record the word and a ’1′. Knowing that we’re going to have to apply some ‘reduce’ task to our output, we can apply a bit of forethought and record our ‘map’ results onto index cards vs. recording the data in a notebook.

The Sort

Now, at the end of each of our respective ‘map’ tasks, we each have a (very large!) stack of index cards. How shall we consolidate and ‘reduce’ all of this data? I propose that I take our stacks, put them together and do a sort by the key of the tuple creating a series of sorted stacks. When I’m through with the sort, I can then give the first half of the cards to you, now sorted and stacked, so that you can continue to help and do half of the ‘reduce’ work to be done, while I do the other half.

The Reduce

In this case, ‘reduce’ is pretty easy. Each of the presorted stack of index cards simply needs to be counted and a tally tuple recorded onto a new index card. When we are each done with our ‘reduce’ task, all that needs to be done is to collect the tally tuples, in the already sorted order, first from you, then from me and voila, we have a sorted list of each word that occurred in the book to be processed along with the count of how many time that word occurred.

Map-Sort-Reduce Redux

To recap, we have distilled our job of counting the number of time each word occurs in a book down into the following tasks:

  1. Split the input. In this task, the book to be processed is split into ‘n’ number of equal pieces, one for each friend who has agreed to help us ‘map’.
  2. Execute the ‘map’ in parallel. Each friend helping to ‘map’ is given their split of the book. They each independently go off to create a stack of index cards containing a word and a ’1′ for each word as it occurs each time it occurs in their split of the book.

  3. Sort and Divvy. When everyone is done with their ‘map’, I collect all of the index cards and sort them into stacks by word. When the sort is complete, I divvy up the sorted stacks for each friend staying to help ‘reduce’. Note that the ‘map’ and ‘reduce’ are independent. There could be 5 friends helping to ‘map’ but only 2 left to ‘reduce’ or vice-versa, 3 could help ‘map’ and 4 more come over to give us 7 working on ‘reduce’.

  4. Execute the ‘reduce in parallel. Each friend staying to help ‘reduce’ is given their divvy of the mapped and sorted cards. Each presorted stack is counted and a tally tuple is recorded.

  5. Collect the tally tuples. The tally tuples are collected in the already sorted order.

Next Article

In the next post in this series, I’ll cover Hadoop Streaming and how to implement what we’ve covered as a thought experiment in code.

Data Science 101: Machine Learning – Probability

Srinath Sridhar, an engineer with BloomReach, has recorded a 5-part video series giving a fairly comprehensive introduction to the world of Machine Learning. If you’ve ever asked yourself, “What is Machine Learning?”, or wondered how it actually worked, these videos are for you!

This first video goes over some of the fundamental definitions of statistics that are necessary as a foundation to understanding and analyzing the machine learning algorithms that are to be examined in following videos. The presentation defines random variable, sample space, probability, expectation, standard deviation and variance and goes over examples of discrete and continuous probability distributions.

Enhanced by Zemanta

Hands-On Hadoop V2.2.0 – Part 2 – Installation

YARN Architecture

In the last article, we loaded a fresh installation of Ubuntu Linux 12.04.4 LTS Server into a Hyper-V virtual machine. We went on to install development tools and the source code for Hadoop v2.2.0 and we compiled the Hadoop source code to produce a clean, 64-bit install package. In this tutorial, we’ll use that installation package to create a single-node, stand-alone, pseudo-distributed, installation of Hadoop that we can then use to experiment and learn. The basic outline of what we’ll be doing for this installation is as follows:

  1. Load a fresh installation of Ubuntu Linux 12.04.4 LTS Server with only the base software and no additional software packages.
  2. Assuming the Linux installation is in Hyper-V, verify that Hyper-V Integration Components are available.
  3. Install O/S updates.
  4. Install Java 7 JDK
  5. Create a dedicated hadoop user and group
  6. Install ssh; configure for public key authentication
  7. Configure PuTTY (for Windows) for Public Key Authentication
  8. Turn off IPv6
  9. Update hostname and /etc/hosts file
  10. Download, extract and install Hadoop (V2.2.0); including configuration files
  11. Configure environment variables
  12. Create and configure DFS and tmp files

Install base system with NO additional packages

Use the previous tutorial if needed as instruction to install both a virtual machine with Hyper-V and a base Linux system with Ubuntu Linux 12.04.4 LTS Server. When prompted for the hostname, if you give the system the name “master”, you won’t have to change it later.  In addition, when prompted for the initial user name, use the username “hadoop” and you will be able to skip the step of adding that user later.

Hyper-V Integration Services

If you’re installing into a virtual machine with Hyper-V, you’ll want to make sure that the Hyper-V integration service components have been loaded and are active. Hyper-V Integration Services are conveniences supplied by Microsoft which help the guest operating system (in our case, 64-bit Ubuntu 12.04.4 LTS Server) to operate with greater transparency on the host system (in my particular base, MS Windows Server 2008 R2). These are things like network and video drivers and software to help out the mouse.

The good news is that the Hyper-V integration services are built-in to Ubuntu LTS 12.04 (and have been for some time; since Ubuntu version 10.10) and do not require a separate download and installation. To verify that the integration components are loaded into the kernel runtime, run the following command:

1
lsmod | grep hv

If all is well, the output should display something similar to the following:

hadoop@master:~$ lsmod | grep hv
hv_netvsc   31677  0
hv_utils    18348  0
hv_storvsc  22357  2
hv_vmbus    48420  5 hid_hyperv,hyperv_fb,hv_netvsc,hv_utils,hv_storvsc

Take note: The version of the Integration Components installed into Ubuntu LTS 12.4 is not the exact version that Windows Server 2012 or Windows Server 2008 R2 is expecting. You might see event log entries similar to the ones below. These messages can be safely ignored.

  • Networking driver on ‘Virtual Machine’ loaded but has a different version from the server. Server version 3.2 Client version 2.0 (Virtual machine ID DC1CCF5C-0C1A-4825-B32C-9A4F8F85AA9D). The device will work, but this is an unsupported configuration. This means that technical support will not be provided until this problem is resolved. To fix this problem, upgrade the integration services. To upgrade, connect to the virtual machine and select Insert Integration Services Setup Disk from the Action menu.
  • A storage device in ‘Virtual Machine’ loaded but has a different version from the server. Server version 4.2 Client version 2.0 (Virtual machine ID DC1CCF5C-0C1A-4825-B32C-9A4F8F85AA9D). The device will work, but this is an unsupported configuration. This means that technical support will not be provided until this problem is resolved. To fix this problem, upgrade the integration services. To upgrade, connect to the virtual machine and select Insert Integration Services Setup Disk from the Action menu.

Install Updates

After the initial system has been installed and started up for the first time, one of the first tasks should be to apply any available updates. The following two command will accomplish this:

1
2
sudo apt-get update
sudo apt-get dist-upgrade

Install Java 7 JDK

As Hadoop is a Java framework, it requires the Java 7 JDK to function.

There are two options for the Java 7 JDK.

  1. Oracle Java 7 JDK
  2. OpenJDK Java 7 JDK

Either the Oracle JDK or OpenJDK will work, however, installing the OpenJDK version is the path of least resistance and the easiest to install. Instructions to install both follow:

Install OpenJDK Java 7 JDK

1
sudo apt-get install openjdk-7-jdk

Install Oracle Java 7 JDK

The Oracle JDK is the official JDK; however, it is no longer provided by Oracle as a default installation for Ubuntu. It can, however, still be installed by using apt-get. To install any version, first execute the following commands:

1
2
3
4
sudo apt-get install python-software-properties
sudo apt-add-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer

Verify the Java Version

1
java -version

Set JAVA_HOME and update PATH

Setting the “JAVA_HOME” environment variable

To set the JAVA_HOME environment variable, which is needed for some programs, first find out the path of your Java installation:

1
sudo update-alternatives --config java

It should return something like:

There are 3 choices for the alternative java (providing /usr/bin/java).
 
Selection  Path                                         Priority   Status
---------------------------------------------------------
* 0        /usr/lib/jvm/java-7-oracle/jre/bin/java         1062    auto mode
  1        /usr/lib/jvm/java-6-openjdk-amd64/jre/bin/java  1061    manual mode
  2        /usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java  1051    manual mode
  3        /usr/lib/jvm/java-7-oracle/jre/bin/java         1062    manual mode
 
Press enter to keep the current choice[*], or type selection number:

The installation path for each is:

/usr/lib/jvm/java-7-oracle
/usr/lib/jvm/java-6-openjdk-amd64
/usr/lib/jvm/java-7-openjdk-amd64

Copy the path from your preferred installation and then edit the file /etc/environment:

1
sudo vi /etc/environment

In this file, add the following line (replacing YOUR_PATH by the just copied path):

1
JAVA_HOME="YOUR_PATH"

That should be enough to set the environment variable. Now reload this file:

1
source /etc/environment

Test it by executing:

1
echo $JAVA_HOME

If it returns the just set path, the environment variable has been set successfully. If it doesn’t, please make sure you followed all steps correctly.


Create a Dedicated hadoop User and Group

Create a dedicated user and group to serve as the “application id” for hadoop.  If you told the system to create the initial username as hadoop during installation, this user will already exists and it will be in a group called “hadoop” as well.  You may skip these commands if that is the case.  If you created some other username, you will need to execute these commands.  The first creates a group called “hadoop”; the second creates a user named “hadoop” within the group “hadoop”.

1
2
sudo addgroup hadoop
sudo adduser --ingroup hadoop hadoop

Add User to the sudoers Group

If you created the “hadoop” user at installation, this is another command you may skip.  If not, it adds the “hadoop” user to the group allowed to execute commands as root via sudo.

1
sudo usermod -a -G sudo hadoop

Configure SSH and Public Key Authentication

Next on the list of items to install to get Hadoop operational is SSH.  SSH will be used for communication between the nodes, making use of private – public keys allowing us to forego the use of passwords but still remain secure.

  1. Install the most current version of the SSH server
    1
    
    sudo apt-get install openssh-server
  2. Verify that the SSH server is running
    1
    
    sudo netstat -tulpn | grep :22

    Assuming that all is well, you should be something similar to the following:

    tcp    0   0   0.0.0.0:22   0.0.0.0:*   LISTEN   1043/sshd
    tcp6   0   0   :::22        :::*        LISTEN   1043/sshd
  3. Login as the hadoop user (assuming you aren’t already logged in as “hadoop”) and generate an RSA keypair
    1
    2
    
    su - hadoop
    ssh-keygen -t rsa -P ""

    When asked to enter the file in which to save the key that is being generated, press ‘Enter’ to accept the default of:

    ~/.ssh/id_rsa
  4. Enable ssh access with the newly created key
    1
    
    cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  5. Test SSH
    1
    
    ssh localhost

Configure PuTTY (for Windows) for Public Key Authentication

This section is completely optional. If, however, you are a Windows users, there is a utility called PuTTY which will facilitate connecting into your Linux system(s) from Windows using SSH.  Additionally, it can make use of the public key authentication we just enabled on the Linux system. If you don’t already have it, links to PuTTY are as follows:

  • You should be able to find PuTTY homepage HERE.
  • A direct link to the PuTTY download page can be found HERE.
  • A direct link to download the PuTTY windows installer can be found HERE.
  1. 'cat' Private Key to Screen, Copy and Paste to Notepad

    ‘cat’ Private Key to Screen, Copy and Paste to Notepad

    Copy the private key from ~/.ssh/id_rsa to a file called hadoop.prv on the Windows PC. The simplest way of doing this is to `cat` the private keyfile to the screen and use the mouse to copy the text of the key and paste it into notepad.Save the file somewhere you’ll be able to find it again under the name “hadoop.prv”

    1
    
    cat ~/.ssh/id_rsa
  2. PuTTYGen - Load Foreign Key to Convert

    PuTTYGen – Load Foreign Key to Convert

    The format of SSH keys used by PuTTY differs from OpenSSH. The PuTTYGen.exe utlity, however, has the functionality to convert an OpenSSH key to the format required by PuTTY. Run the PuTTYGen.exe utility on the Windows PC and select the “Load” button next to the “Load an existing private key file” label.

  3. PuTTYGen - Foreign Key Imported

    PuTTYGen – Foreign Key Imported

    Locate the hadoop.prv file and select it to load into PuTTYGen. If all has gone well, you will be greeted with a message beginning: “Successfully imported foreign key”. You may click the “OK” button on this dialog.

  4. PuTTYGen - Save the Updated Private Key

    PuTTYGen – Save the Updated Private Key

    Now select the button labeled “Save private key” next to the “Save the generated key” label. Take care to *NOT* select the “Save public key” button.

  5. PuTTYGen - Answer 'Yes' to Save Without Passphrase

    PuTTYGen – Answer ‘Yes’ to Save Without Passphrase

    When prompted if you are sure you want to save the key without a passphrase, select “Yes”. Specify the name of the new private key as “hadoop.ppk” when prompted to save the file.

  6. PuTTY Configuration - Specify IP Address, Port Number, Protocol (SSH), Session Name and go to Data Screen

    PuTTY Configuration – Specify IP Address, Port Number, Protocol (SSH), Session Name and go to Data Screen

    Launch PuTTY. In the session screen, enter the IP address of the Ubuntu system to which you are connecting. Confirm that the Port is “22″ and that the connection type is set to “SSH”. In “Saved Sessions”, give the connection a name vis-a-vis: hadoop@master. On the left side of the screen, select “Data” just below the section labeled “Connection”. This will take you to the next screen we need to modify.

  7. PuTTY Configuration - Specify Login Username, Go to Authentication Screen

    PuTTY Configuration – Specify Login Username, Go to Authentication Screen

    In the “Auto-login username” field, enter “hadoop”. On the left side of the screen, select ‘Auth’ below the section labeled ‘SSH’ to go to the last PuTTY to screen to update.

  8. PuTTY Configuration - Specify the Private Key File, Return to Session Screen

    PuTTY Configuration – Specify the Private Key File, Return to Session Screen

    Select the “Browse” button next to the field labeled “Private key file for authentication”. Locate and select the hadoop.ppk file you generated earlier with PuTTYGen. On the left side of the screen, at the very top of the list in the section labeled ‘Category:’, select ‘Session’ to go back to the first screen we edited.

  9. PuTTY Configuration - Save the Session and Open the Connection

    PuTTY Configuration – Save the Session and Open the Connection

    Select the “Save” button to save these configuration parameters to the name specified. Click the “Open” button. If all has gone well, you will be prompted to accept the initial connection and logged into the Ubuntu system as the hadoop user


Turn off IPV6

Hadoop can have issues with IPV6. The easiest and safest way to resolve these issues is to simply disable IPV6. Add these lines to the bottom of sysctl.conf:

1
sudo vi /etc/sysctl.conf
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

Then run:

1
sudo sysctl -p

or

1
sudo shutdown -r now

To check if IPv6 is enabled or disabled, from a terminal window:

1
cat /proc/sys/net/ipv6/conf/all/disable_ipv6

0 means its enabled and 1 is disabled.


Update hostname and the /etc/hosts file

We want to name this system ‘”master’”. While at the present time it’’s only a master of itself, —this will come in handy in the future. If you specified the hostname to be “master” at installation, you may skip this step.

We need to change the hostname and then configure our hosts file.

1
2
sudo hostname master
sudo vi /etc/hostname

The hostname file should have a single line:

master

Next, modify /etc/hosts file and just after the localhost line, add an entry identifying ‘master’:

127.0.0.1 localhost
10.20.1.60 master

This is assuming your machine has the IP address 10.20.1.60. Make sure that this address is accessible from other machines on your network. We will be using it for looking at some stats via the web interface.


Obtain and Extract the Hadoop Installation Tarball

Now we can start setting up Hadoop. Copy the tarball you created in the previous tutorial over to master.  If you don’t want to go through the process of compiling for 64bit, I’ve made available a precompiled installation tarball HERE or you can execute the following command from your linux system:

1
2
cd ~
wget https://dl.dropboxusercontent.com/u/24159970/64-bit-hadoop-2.2.0-ubuntu-12.04.4.tar.gz
cd /usr/local
sudo tar zxf ~/hadoop-2.2.0.tar.gz
sudo chown hadoop:hadoop -R hadoop-2.2.0/  # change ownership
sudo ln -s hadoop-2.2.0 hadoop  # create a symbolic link for future upgrades
sudo chown hadoop:hadoop -R hadoop # change ownership of the symlink

Inside of /usr/local/hadoop are two directories which contain executable files; /bin and /sbin. These need to be added to the path definition. This can be done in a number of ways; one way, for example, would be to append following to the “PATH=” statement in the /etc/environment file:

1
sudo vi /etc/environment
:/usr/local/hadoop/bin:/usr/local/hadoop/sbin

An alternate method would be to modify the “export PATH=” statement in your .bashrc file in a similar manner. It’s your choice how to make the change, but it is important that you do make it!

After you’ve made your change to the /etc/environment or .bashrc or whatever your choice was, you need to have it take effect.  The easiest way to do this is to logout and back in– alternately, you can source the file with the ‘.’ or ‘source’ command:

 


Create a Location for the Distributed File System (DFS)

sudo mkdir -p /data/hadoop/tmp           # tmp space for hadoop
# hadoop.tmp.dir
sudo mkdir -p /data/hadoop/hdfs/namenode # namenode metadata
# dfs.namenode.name.dir
sudo mkdir -p /data/hadoop/hdfs/datanode # datanode data
# dfs.datanode.data.dir
sudo chown hadoop:hadoop /data/hadoop -R

Configuration Files

We’re now to the main course of our hadoop banquet. In this section, we’ll create and fill in a number of configuration files that control how and where our installation of hadoop will do what it does.

While there are a number of files and a few of them are a bit long, they can all be broken down into easy to understand bits. As we go through the files, I’ll strive to explain what each of the configuration options we’re putting in does.

Our start will be to create a symbolic link to the directory that contains the configurations file templates and prototypes:

cd /usr/local/hadoop
sudo ln -s etc/hadoop conf
sudo chown hadoop:hadoop -R conf

Configuration File: core-site.xml

The first configuration file to edit is: /usr/local/hadoop/conf/core-site.xml

1
vi /usr/local/hadoop/conf/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<!--
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
 http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License. See accompanying LICENSE file.
-->
 
<!-- Put site-specific property overrides in this file. -->
<configuration>
 
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/data/hadoop/tmp</value>
    <description>A base for temporary directories.</description>
  </property>
 
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://master:9000/</value>
    <description>
      The name of the default file system. A URI whose scheme and authority 
      determine the FileSystem implementation. The uri's scheme determines 
      the config property (fs.SCHEME.impl) naming the FileSystem 
      implementation class. The uri's authority is used to determine the 
      host, port, etc. for a filesystem.
    </description>
  </property>
 
</configuration>
Configuration Property: hadoop.tmp.dir

The property, hadoop.tmp.dir, tells where to put the Hadoop FS tmp data. If hadoop requires additional directories for temp files, they will be created by hadoop within subdirectories.

Configuration Property: fs.defaultFS

The property, fs.defaultFS, tells Hadoop where to look for the HDFS. Previous versions of Hadoop called this setting, fs.default.name; this is deprecated with the new setting being fs.defaultFS. If, when you later attempt to list the HDFS directory, you see the local filesystem, it will indicate that this parameter is set incorrectly. Note the reference to the hostname master. The number following the hostname is the port name on which the service will be started.  It is best left as the default.

Configuration File: mapred-site.xml

The next configuration file is: /usr/local/hadoop/conf/mapred-site.xml

1
vi /usr/local/hadoop/conf/mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<!--
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
 http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License. See accompanying LICENSE file.
-->
 
<!-- Put site-specific property overrides in this file. -->
<configuration>
 
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    <description>
      The runtime framework for executing MapReduce jobs. Can be one of 
      local, classic or yarn.
    </description>
  </property>
 
</configuration>
Configuration Property: mapreduce.framework.name

The property ‘mapreduce.framework.name’ simply specifies which framework to use for executing mapreduce jobs.  The only framework we’ll be dealing with is ‘yarn’.

Configuration File: hdfs-site.xml

The third configuration file to create is: /usr/local/hadoop/conf/hdfs-site.xml

1
vi /usr/local/hadoop/conf/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<!--
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
 http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License. See accompanying LICENSE file.
-->
 
<!-- Put site-specific property overrides in this file. -->
<configuration>
 
  <property>
    <name>dfs.permissions.superusergroup</name>
    <value>hadoop</value>
  </property>
 
  <property>
    <name>dfs.replication</name>
    <value>1</value>
    <description>
      Default block replication. The actual number of replications 
      can be specified when the file is created. The default of 3 
      is used if replication is not specified.
    </description>
  </property>
 
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:/data/hadoop/hdfs/namenode</value>
  </property>
 
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:/data/hadoop/hdfs/datanode</value>
  </property>
 
</configuration>
Configuration Property: dfs.permissions.superusergroup

The super-user is the user with the same identity as name node process itself. Loosely, if you started the name node, then you are the super-user. The super-user can do anything in that permissions checks never fail for the super-user. There is no persistent notion of who was the super-user; when the name node is started the process identity determines who is the super-user for now. The HDFS super-user does not have to be the super-user of the name node host, nor is it necessary that all clusters have the same super-user. Also, an experimenter running HDFS on a personal workstation, conveniently becomes that installation’s super-user without any configuration.

In addition, the administrator my identify a distinguished group using a configuration parameter. If set, members of this group are also super-users.  The property ‘dfs.permissions.superusergroup’ is that configuration parameter.  We’re setting it to be the ‘hadoop’ group.

Configuration Property: dfs.replication

The ‘dfs.replication’ property specifies the default block replication for a file. The actual number of replications can be specified when the file is created. A default of 3 is used if dfs.replication is not specified.

Configuration Property: dfs.namenode.name.dir

The parameter ‘dfs.namenode.name.dir’ determines where on the local filesystem the DFS name node should store the name table (fsimage). For redundancy, this can be a comma-delimited list of directories.  If so, the name table is replicated into all of the specified directories.

Configuration Property: dfs.datanode.data.dir

The ‘dfs.datanode.data.dir’ parameter is used to determine where on the local filesystem a DFS data node should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all of the named directories. These directories are typically specified to be on different physical devices. Directories that do not exist are ignored.

Configuration File: yarn-site.xml

The next configuration file, conf/yarn-site.xml, has a greater number of parameters than the other files we’re editing, but if taken one at a time, is still relatively simple.

1
vi /usr/local/hadoop/conf/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
 http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License. See accompanying LICENSE file.
-->
 
<!-- Site specific YARN configuration properties -->
 
<configuration>
 
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
 
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
 
  <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
  </property>
 
  <property>
    <name>yarn.dispatcher.exit-on-error</name>
    <value>true</value>
  </property>
 
  <property>
    <name>yarn.app.mapreduce.am.staging-dir</name>
    <value>/tmp/hadoop-yarn/staging</value>
  </property>
 
  <property>
    <name>yarn.resourcemanager.scheduler.address</name>
    <value>master:8030</value>
  </property>
 
  <property>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <value>master:8031</value>
  </property>
 
  <property>
    <name>yarn.resourcemanager.address</name>
    <value>master:8032</value>
  </property>
 
  <property>
    <name>yarn.resourcemanager.admin.address</name>
    <value>master:8033</value>
  </property>
 
  <property>
    <name>yarn.web-proxy.address</name>
    <value>master:8034</value>
  </property>
 
  <property>
    <name>yarn.resourcemanager.webapp.address</name>
    <value>master:8088</value>
  </property>
 
</configuration>
Configuration Property: yarn.nodemanager.aux-services

This is the auxiliary service name. The default is ‘…, mapreduce_shuffle’.

Our configuration is set to default; <value>mapreduce_shuffle</value>.

Configuration Property: yarn.nodemanager.aux-services.mapreduce.shuffle.class

This is the auxiliary service class to use.  The default is: org.apache.hadoop.mapred.ShuffleHandler.

Our configuration is set to the default; <value>org.apache.hadoop.mapred.ShuffleHandler</value>.

Configuration Property: yarn.log-aggregation-enable

Logs of Hadoop jobs serve multiple purposes. First and foremost, they can be used to debug issues while running a MapReduce application – correctness problems with the application itself, race conditions when running on a cluster, and debugging task/job failures due to hardware or platform bugs. Secondly, one can do historical analyses of the logs to see how individual tasks in job/workflow perform over time. One can even analyze the Hadoop MapReduce user-logs using Hadoop MapReduce(!) to determine those performance issues.

Handling of logs generated by applications has been one of the biggest pain-points for Hadoop installations in the past. In Hadoop 1.x, logs are left on individual nodes by the TaskTracker, and the management of the log files on local nodes is both insufficient for long term analyses as well as non-deterministic for user access.

YARN Log-Aggregation Details

Instead of truncating user-logs, and leaving them on individual nodes for a certain amount of time as was done by the TaskTracker in previous versions of Hadoop, the NodeManager addresses the log management issue by providing the option to move these logs onto a file-system (FS), e.g. HDFS, after the application completes.

  • Logs for all the containers belonging to a single Application and that ran on a given NM are aggregated and written out to a single (possibly compressed) log file at a configured location in the FS.
  • In the current implementation, once an application finishes, one will have
    • an application level log-dir and
    • a per-node log-file that consists of logs for all the containers of the application that ran on this node.
  • Users have access to these logs via command line tools, the web-UI or directly from the FS – no longer just restricted to a web only interface.
  • These logs potentially can be stored for much longer times than was possible in 1.x, given they are stored a distributed file-system.
  • We don’t need to truncate logs to very small lengths – as long as the log sizes are reasonable, we can afford to store the entire logs.
  • In addition, while the containers are running, the logs are now written to multiple directories on each node for effective load balancing and improved fault tolerance.
  • AggregatedLogDeletionService is a service that periodically deletes aggregated logs. Today it is run inside the MapReduce JobHistoryServer only.
General log related configuration properties
  • yarn.nodemanager.log-dirs: Determines where the container-logs are stored on the node when the containers are running. Default is ${yarn.log.dir}/userlogs.
    • An application’s localized log directory will be found in ${yarn.nodemanager.log-dirs}/application_${appid}.
    • Individual containers’ log directories will be below this, in directories named container_{$containerId}.
    • For MapReduce applications, each container directory will contain the files stderr, stdin, and syslog generated by that container.
    • Other frameworks can choose to write more or less files, YARN doesn’t dictate the file-names and number of files.
  • yarn.log-aggregation-enable: Whether to enable log aggregation or not. If disabled, NMs will keep the logs locally (like in 1.x) and not aggregate them.
Properties respected when log-aggregation is enabled
  • yarn.nodemanager.remote-app-log-dir: This is on the default file-system, usually HDFS and indicates where the NMs should aggregate logs to. This should not be local file-system, otherwise serving daemons like history-server will not able to serve the aggregated logs. Default is /tmp/logs.
  • yarn.nodemanager.remote-app-log-dir-suffix: The remote log dir will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{thisParam}. Default value is “logs””.
  • yarn.log-aggregation.retain-seconds: How long to wait before deleting aggregated-logs, -1 or a negative number disables the deletion of aggregated-logs. One needs to be careful and not set this to a too small a value so as to not burden the distributed file-system.
  • yarn.log-aggregation.retain-check-interval-seconds: Determines how long to wait between aggregated-log retention-checks. If it is set to 0 or a negative value, then the value is computed as one-tenth of the aggregated-log retention-time. As with the previous configuration property, one needs to be careful and not set this to low values. Defaults to -1.
  • yarn.log.server.url: Once an application is done, NMs redirect web UI users to this URL where aggregated-logs are served. Today it points to the MapReduce specific JobHistory.
Properties respected when log-aggregation is disabled
  • yarn.nodemanager.log.retain-seconds: Time in seconds to retain user logs on the individual nodes if log aggregation is disabled. Default is 10800.
  • yarn.nodemanager.log.deletion-threads-count: Determines the number of threads used by the NodeManagers to clean-up logs once the log-retention time is hit for local log files when aggregation is disabled.

The only parameter we’re setting of these is ‘yarn.log-aggregation-enable’, to ‘true’, to tell the NodeManager to aggregate our logs. All of the other settings discussed above will fall to their default settings.

Configuration Property: yarn.dispatcher.exit-on-error

This is a configuration setting to make sure dispatcher crashes but doesn’t do a system-exit in case of errors. By default, it should be false, so that tests are not affected. For all daemons it should be explicitly set to true so that daemons can crash instead of hanging around. We’ve set this to ‘true’.

Configuration Property: yarn.app.mapreduce.am.staging-dir

This is the staging directory used while submitting jobs. We’ve set this to ‘/tmp/hadoop-yarn/staging’.

Configuration Property: yarn.resourcemanager.scheduler.address

This is the address of the scheduler interface.

The default is: ${yarn.resourcemanager.hostname}:8030

Our configuration is set to be: master:8030

Configuration Property: yarn.resourcemanager.resource-tracker.address

This is the address of the resource tracker interface.

The default is: ${yarn.resourcemanager.hostname}:8031

Our configuration is set to be: master:8031

Configuration Property: yarn.resourcemanager.address

This is the address of the applications manager interface in the ResourceManager.

The default is: ${yarn.resourcemanager.hostname}:8032

Our configuration is set to be: master:8032

Configuration Property: yarn.resourcemanager.admin.address

This is the address of the ResourceManager admin interface.

The default is: ${yarn.resourcemanager.hostname}:8033

Our configuration is set to be: master:8033

Configuration Property: yarn.web-proxy.address

This is the address for the web proxy as HOST:PORT, if this is not given then the proxy will run as part of the ResourceManager.

Our configuration is set to be: master:8034

Configuration Property: yarn.resourcemanager.webapp.address

This is the http address of the Resource Manager web application.

The default is: ${yarn.resourcemanager.hostname}:8088

Our configuration is set to be: master:8088

Configuration File: capacity-scheduler-site.xml

The last XML configuration file to create is: /usr/local/hadoop/conf/capacity-scheduler-site.xml

It is gotten by simply copying the template file into it:

1
2
cd /usr/local/hadoop/conf
cp capacity-scheduler.xml capacity-scheduler-site.xml

Nothing needs to be changed in this file. I have included it here so you see what it is and have an explanation of its contents.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<!--
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
 
 http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License. See accompanying LICENSE file.
-->
 
<configuration>
 
  <property>
    <name>yarn.scheduler.capacity.maximum-applications</name>
    <value>10000</value>
    <description>
      Maximum number of applications that can be pending and running.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.1</value>
    <description>
      Maximum percent of resources in the cluster which can be used to run
      application masters i.e. controls number of concurrent running
      applications.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
    <description>
      The ResourceCalculator implementation to be used to compare
      Resources in the scheduler.
      The default i.e. DefaultResourceCalculator only uses Memory while
      DominantResourceCalculator uses dominant-resource to compare
      multi-dimensional resources such as Memory, CPU etc.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>100</value>
    <description>Default queue target capacity.</description>
    </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <value>1</value>
    <description>
      Default queue user limit a percentage from 0.0 to 1.0.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>100</value>
    <description>
      The maximum capacity of the default queue.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.state</name>
    <value>RUNNING</value>
    <description>
      The state of the default queue. State can be one of RUNNING or STOPPED.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <value>*</value>
    <description>
      The ACL of who can submit jobs to the default queue.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <value>*</value>
    <description>
      The ACL of who can administer jobs on the default queue.
    </description>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.node-locality-delay</name>
    <value>-1</value>
    <description>
      Number of missed scheduling opportunities after which the
      CapacitySchedule attempts to schedule rack-local containers.
      Typically this should be set to number of racks in the cluster, this
      feature is disabled by default, set to -1.
    </description>
  </property>
 
</configuration>
Configuration Property: yarn.scheduler.capacity.maximum-applications

Maximum number of applications in the system which can be concurrently active both running and pending. Limits on each queue are directly proportional to their queue capacities and user limits. This is a hard limit and any applications submitted when this limit is reached will be rejected. Default is 10000.

This can be set for all queues with yarn.scheduler.capacity.maximum-applications and can also be overridden on a per queue basis by setting yarn.scheduler.capacity.<queue-path>.maximum-applications.

Integer value expected.

Our configuration is set to be the default of 10,000.

Configuration Property: yarn.scheduler.capacity.maximum-am-resource-percent

yarn.scheduler.capacity.maximum-am-resource-percent /
yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent

Maximum percent of resources in the cluster which can be used to run application masters – controls number of concurrent active applications. Limits on each queue are directly proportional to their queue capacities and user limits. Specified as a float – ie 0.5 = 50%. Default is 10%. This can be set for all queues with yarn.scheduler.capacity.maximum-am-resource-percent and can also be overridden on a per queue basis by setting yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent

Our configuration is left to be at the default of 10%; <value>0.1</value>.

Configuration Property: yarn.scheduler.capacity.resource-calculator

This is the ResourceCalculator implementation to be used to compare Resources in the scheduler.

The default i.e. DefaultResourceCalculator only uses Memory while DominantResourceCalculator uses dominant-resource to compare multi-dimensional resources such as Memory, CPU etc.

Our configuration is set to the default; <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>.

Configuration Property: yarn.scheduler.capacity.root.queues

The CapacityScheduler has a pre-defined queue called root. All queues in the system are children of the root queue.

Further queues can be setup by configuring yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.

The configuration for CapacityScheduler uses a concept called queue path to configure the hierarchy of queues. The queue path is the full path of the queue’s hierarchy, starting at root, with . (dot) as the delimiter.

A given queue’s children can be defined with the configuration knob: yarn.scheduler.capacity.<queue-path>.queues. Children do not inherit properties directly from the parent unless otherwise noted.

Here is an example with three top-level child-queues a, b and c and some sub-queues for a and b:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>a,b,c</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>
 
<property>
  <name>yarn.scheduler.capacity.root.a.queues</name>
  <value>a1,a2</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>
 
<property>
  <name>yarn.scheduler.capacity.root.b.queues</name>
  <value>b1,b2,b3</value>
  <description>The queues at the this level (root is the root queue).
  </description>
</property>

Our configuration is set to be a single queue named ‘root.default’.

Configuration Property: yarn.scheduler.capacity.root.default.capacity

yarn.scheduler.capacity.<queue-path>.capacity

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

Queue capacity in percentage (%) as a float (e.g. 12.5). The sum of capacities for all queues, at each level, must be equal to 100. Applications in the queue may consume more resources than the queue’s capacity if there are free resources, providing elasticity.

Since we have only a single queue configured, it is set to 100%.

Configuration Property: yarn.scheduler.capacity.root.default.user-limit-factor

yarn.scheduler.capacity.<queue-path>.user-limit-factor

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue’s configured capacity irrespective of how idle the cluster is. Value is specified as a float.

Our configuration leaves this set at the default of <value>1</value>.

Configuration Property: yarn.scheduler.capacity.root.default.maximum-capacity

yarn.scheduler.capacity.<queue-path>.maximum-capacity

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue. Defaults to -1 which disables it.

Our configuration is set to be 100%; <value>100</value>.

Configuration Property: yarn.scheduler.capacity.root.default.state

yarn.scheduler.capacity.<queue-path>.state

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

The state of the queue. Can be one of RUNNING or STOPPED. If a queue is in STOPPED state, new applications cannot be submitted to itself or any of its child queues. Thus, if the root queue is STOPPED no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be drained gracefully. Value is specified as Enumeration.

Our configuration has this set to ‘RUNNING’.

Configuration Property: yarn.scheduler.capacity.root.default.acl_submit_applications

yarn.scheduler.capacity.<queue-path>.acl_submit_applications

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

The ACL which controls who can submit applications to the given queue. If the given user/group has necessary ACLs on the given queue or one of the parent queues in the hierarchy they can submit applications. ACLs for this property are inherited from the parent queue if not specified.

Our configuration has this set to be ‘*’ indicating that any user may submit a job to this queue.

Configuration Property: yarn.scheduler.capacity.root.default.acl_administer_queue

yarn.scheduler.capacity.<queue-path>.acl_administer_queue

Given the queue we have configured (named ‘default’), the <queue-path> is defined as ‘root.default’.

The ACL which controls who can administer applications on the given queue. If the given user/group has necessary ACLs on the given queue or one of the parent queues in the hierarchy they can administer applications. ACLs for this property are inherited from the parent queue if not specified.

Our configuration has this set to be ‘*’ indicating that any user is able to administer this queue.

Configuration Property: yarn.scheduler.capacity.node-locality-delay

Number of missed scheduling opportunities after which the CapacityScheduler attempts to schedule rack-local containers. Typically this should be set to number of racks in the cluster, this feature is disabled by default, set to -1.

Our configuration has this set to be disabled; <value>-1</value>.

Configuration File: slaves

The final thing that needs to be done is tell Hadoop about slaves. For now, we have only one. Put the following in conf/slaves:

1
master

Configuration File: ~/.bashrc

The last thing that needs to be done it to put the following at the end of your .bashrc file.  If you wish, you can put the corresponding environment variable definitions into the /etc/environment file; your choice.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export JAVA_HOME=/usr/lib/jvm/jdk
 
export HADOOP_PREFIX=/usr/local/hadoop
export HADOOP_HOME=/usr/local/hadoop
 
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_YARN_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
 
export HADOOP_CONF_DIR=${HADOOP_PREFIX}/conf
export YARN_CONF_DIR=${HADOOP_PREFIX}/conf
 
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

That’s concludes all configuration that is required. It is now possible to initialize and start the node and begin to execute tests against it.


Cluster Startup

Since we are only on one node, starting it up is pretty easy. The basic steps are:

  • Format the NameNode – This is done only once
  • Start the Distributed File System daemons
    • First the NameNode
    • Then the DataNode
    • Third, the SecondaryNameNode
  • Start the YARN daemons
    • First the ResourceManager
    • Then the NodeManager

At any point, you can check to see what is running by using the Java ps command:

1
jps

Format the NameNode

The command to format the NameNode is:

1
hdfs namenode -format

This is executed only one time.  If you execute it a second time, you will re-format the cluster and lose all of your data.

Note that the hdfs command is in /user/local/hadoop/bin; this was added to the path in a previous step.

Start HDFS

There are two methods by which HDFS may be started. The first is:

1
start-dfs.sh

This first command aggregates the commands in the following alternate method:

1
2
3
hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode 
hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode
hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start secondarynamenode

If at a later run, you get an error like this:

1
There appears to be a gap in the edit log. We expected txid 1, but got txid 705.

Append ‘-recover’ to the namenode command above. Check which resources are running by executing the Java ps command:

1
jps

You should see a NameNode and DataNode along with JPS itself. It should look similar to the following:

 

Start YARN

As with HDFS, there are two methods by which YARN may be started. The first is:

1
start-yarn.sh

This first command aggregates the commands in the following alternate method:

1
2
yarn-daemon.sh --config $HADOOP_CONF_DIR start resourcemanager 
yarn-daemon.sh --config $HADOOP_CONF_DIR start nodemanager

Running JPS should now list a NameNode, a DataNode, a ResourceManager, a NodeManager and JPS itself. To test the HDFS, you can issue the following command:

1
hdfs dfs -ls /

It shouldn’t return anything at this point. If it lists your local system files, re-check the fs.defaultFS setting. If everything works fine, go ahead and attempt to run the hadoop examples.


Executing an Example Job

Let’s get some files to upload to our DFS. We will get a few files from the Gutenberg project. See details here: http://www.gutenberg.org

1
2
3
4
5
6
cd /tmp 
mkdir gutenberg 
cd gutenberg 
wget http://www.gutenberg.org/cache/epub/20417/pg20417.txt 
wget http://www.gutenberg.org/cache/epub/5000/pg5000.txt 
wget http://www.gutenberg.org/cache/epub/4300/pg4300.txt

Now, let’s create a folder in our HDFS and upload the folder there.

1
2
3
hdfs dfs -mkdir -p /user/hadoop/ 
hdfs dfs -copyFromLocal /tmp/gutenberg /user/hadoop/ 
hdfs dfs -ls /user/hadoop/gutenberg

If you can see the three files listed properly, we’re all good to go here and we can now run the wordcount example on this.

1
2
3
4
cd /usr/local/hadoop 
find . -name *examples*.jar # see where the file is found and use it below 
cp share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar ./ 
hadoop jar hadoop-mapreduce-examples-2.2.0.jar wordcount /user/hadoop/gutenberg /user/hadoop/gutenberg-out

That should run for a bit and then produce something like the following at the end:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Map-Reduce Framework
   Map input records=77931
   Map output records=629172
   Map output bytes=6076101
   Map output materialized bytes=1459156
   Input split bytes=349
   Combine input records=629172
   Combine output records=101113
   Reduce input groups=82335
   Reduce shuffle bytes=1459156
   Reduce input records=101113
   Reduce output records=82335
   Spilled Records=202226
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=575
   CPU time spent (ms)=11290
   Physical memory (bytes) snapshot=902467584
   Virtual memory (bytes) snapshot=4101705728
   Total committed heap usage (bytes)=661127168
Shuffle Errors
   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
File Input Format Counters
   Bytes Read=3671523
File Output Format Counters
   Bytes Written=880838

You can now do a usual dfs -ls on the output folder to check and then get the output using the following commands:

1
2
hdfs dfs -getmerge /user/hadoop/gutenberg-out/part-r-00000 /tmp/gutenberg-wordcount
head /tmp/gutenberg-wordcount

or, the alternative commands:

1
2
hdfs dfs -copyToLocal /user/hadoop/gutenberg-out /tmp/
head /tmp/gutenberg-wordcount

The contents should look something like this:

1
2
3
4
5
6
7
8
9
10
11
12
hadoop@master:~$ head /tmp/gutenberg-wordcount
"(Lo)cra" 1
"1490     1
"1498,"   1
"35"      1
"40,"     1
"A        2
"AS-IS".  1
"A_       1
"Absoluti 1
"Alack!   1
hadoop@master:~$

That concludes this tutorial on the installation of a single-node pseudo-cluster of Hadoop V2.2.0 onto Ubuntu Linux.  In coming articles we’ll use what we’ve installed to teach ourselves MapReduce programming with Python and expand our single-node installation to become a mini-cluster of 3 machines.

If you run into trouble or have questions, please leave a comment and I’ll do my best to help you out.