An overview of Impala

As enterprises move to Hadoop based data solutions, a key pattern seen is that ‘bigdata’ processing happens in Hadoop land and the resultant derived datasets (such as fine grained aggregates) are migrated into a traditional data warehouse for further consumption. The reason this pattern exists is because, until relatively recently, Hadoop widely remained a batch processing system, which meant it was not very suitable for exploratory analytics that is common in standard Business Intelligence applications. The high latencies involved in batch oriented Hadoop jobs, and the gap vis-a-vis standard SQL compatibility were some of the main reasons for this. However, as it became clear that Hadoop was here to stay, both the Hadoop vendors (and community) and the enterprise data warehousing vendors realised the need to close these gaps.

Within the Hadoop community, this manifested itself in several independent efforts to bring low latency, SQL based query experience onto Hadoop. This included efforts to support standard SQL and low query latencies that users come to expect with a conventional database solution. The class of such solutions are generally referred to as “SQL on Hadoop” solutions. Cloudera’s Impala is a prominent player in this space. Other systems include the Stinger initiative led by Hortonworks, Hawq from Pivotal, Polybase from Microsoft, Presto from Facebook, etc.

This post is an overview of Impala. A tl;dr summary could be that Impala employs distributed query processing – a technique wherein a distributed query plan is generated and distributed to multiple nodes that process the data in parallel and aggregate final results to the user. Several other optimisations contribute to the speed up.

Impala Components

The following picture illustrates the main components in an Impala solution:

impala-arch

Broadly, the components can be organised into four layers:

Storage

This layer contains components that store the data to be queried. The components here are not really Impala components, but the regular components that store data in Hadoop, such as the Datanodes of HDFS and HBase region servers.

Query Engine 

This is the most important layer in the solution.

The query engine comprises of the core Impala component, called ImpaladThis daemon is primarily responsible for executing queries. In a cluster, typically there are as many daemons as datanodes. Impala has a peer-to-peer architecture, i.e. there is no designated master. Clients can connect to any daemon to issue queries. The daemon to which a query is issued is called the ‘coordinator‘. The coordinator distributes the work of executing a query to multiple Impala daemons, and aggregates results from them to return to the client.

In their turn, the participating peer Impala daemons perform the function of executing the query fragment on the data assigned to them, and sending the results back to the coordinating Impala daemon.

The statestored is another Impala component that tracks the availability of Impala daemons, and transmits their health status to other daemons. The Impala daemons use this information to avoid trying to distribute query work to unreachable or unhealthy nodes.

Metadata 

This layer contains components that hold information about the data.

The Namenode is the HDFS master that holds information about the  data stored on datanodes.

The Hive metastore is another important component in an Impala cluster. This is the component that provides Impala with information on the SQL schemas – the databases, tables, column definitions, storage formats, etc – on the basis of which queries are compiled and executed. This implies that Impala can query tables that are already defined in Hive. Metadata of any new tables created via Impala is also stored in the same metastore.

The catalogd daemon is another core Impala component that is responsible for synchronising metadata information amongst the Impala daemons and between the other metadata components and Impala daemons.

Consumption Layer 

The consumption layer includes components that fire queries and retrieve back results. The Impala shell is akin to any SQL shell utility. Using it, one can connect to an Impala daemon and fire queries at it. The JDBC / ODBC drivers enable connectivity for programmatic integrations, such as from custom web applications, or existing Business Intelligence solutions. Hue is a graphical UI tool from Cloudera that provides a SQL workbench like functionality.

Collaborations

Let us consider collaborations between Impala components in the context of two main flows: executing a query, and synchronising metadata.

Executing a query

  • As described earlier, the Impala client connects to an Impala daemon to issue the query. This Impala daemon logically becomes the ‘coordinator’ for this query
  • The daemon analyses the query, generates a distributed query plan, and breaks the plan up so as to distribute work amongst its peers.
  • In order to cause an efficient distribution, the daemon relies on metadata information that it would have cached from interactions with the catalogd service. This metadata information will, for example, talk about block locations on datanodes where the data is located.
  • Work is now distributed to multiple Impala daemons. The coordinator relies on information from statestored to know which daemons are unhealthy and avoid sending work to them.
  • The worker Impala daemons will execute the partial queries assigned to them.
  • In the process, they read data from the underlying storage systems – i.e. datanodes or HBase. For reading from datanodes, one interesting point is that they use a HDFS feature called “short-circuit reads“. When enabled, this feature allows HDFS clients co-located with a datanode to read directly using a Unix domain socket shared between the datanode and the client, rather than go the way of a typical Hadoop RPC call for read. Details of this feature are here.
  • Once each daemon has executed its part of the query, the results are sent back to the coordinator, which aggregates the results and returns them back to the client.

Synchronising metadata

  • The catalogd process is responsible for looking up metadata information from HDFS and Hive metastore, and broadcasts it to the Impala daemons at startup time.
  • The daemons cache this information, and use it for query planning as described above.
  • When a DDL operation is issued by an Impala client, this is handled by the catalogd process, which updates Hive metastore and also broadcasts changes to other daemons.
  • As Impala can also query tables defined using Hive, it needs to synchronise changes that happen via Hive as well. Currently, there is no automated way of doing this. The user needs to issue a special command to invalidate or refresh metadata.
  • When these synchronisation commands are issued, again the catalogd process manages this globally, refreshes the metadata from underlying systems and broadcasts changes to other daemons.

Conclusion

The Hadoop community and vendors are striving to make low latency SQL on Hadoop a reality. In Cloudera’s case, Impala offers to achieve this via several features such as distributed query processing, minimising avoidable I/O by eliminating the slow Map/Reduce model of traditional Hadoop, optimisations like HDFS short circuit reads, aggressive caching of metadata, relying on table and column statistics, and so on. The success of these platforms will go a long way in simplifying access to big data stored on Hadoop systems.

References

Most of this information is from Cloudera’s documentation of Impala.

For clarifying parts of the interactions between Impala components, chiefly on synchronising metadata, I had written on the impala-user mailing list, and received helpful responses from Marcel Kornacker, the architect of Impala.

Advertisements
Tagged with: , , , ,
Posted in Hadoop, Technical

A couple of package management tricks

Few weeks back, I spent some time trying out Cloudera Impala. The target cluster was a 15 node cluster of CentOS VMs running CDH 4.5. The idea of the trial was to gain experience installing and running Impala on a dataset for a client project, and see how it measures against our use cases. I will hopefully cover the results of the trial in a future post.

In this post, I am going to write about two package management tricks I learnt as part of the installation. The tricks are not specific to installing Impala, and hence will be widely applicable. Therefore, I hope this post will be of some use to folks reading it.

Repeating a multi-file install on multiple machines

The first task was, of course, to install Impala on multiple machines. Using Cloudera Manager would possibly have made this task easier. However, we were not using it. If we were setting up from scratch, we could also have set up one machine and cloned it. However, the cluster was not only setup, but was being used in parallel for other purposes and we did not want to disturb the existing set up.

To install Impala on these CentOS machines, I had to use the yum package management tool, and point to Cloudera’s yum repositories. The install was installing more than 100 new packages and upgrading more than 10 of them from the Internet – overall a lot of bits to pull down onto multiple machines. I wanted to do this more efficiently. This led me to learn about setting up local yum repositories.

A local yum repository is a local directory containing all the required packages. This link had all the necessary background material to understand how to set one up. In order, I did the following:

  • Install a plugin to yum called yum-downloadonly.
  • With this, execute a command to just download, but not install, the RPM files:

yum install <package> –downloadonly

  • Run the ‘createrepo’ command on the directory containing the downloaded RPM files, like so:

createrepo <path to downloaded packages>

  • Create a yum configuration file pointing to this folder. This file can look like below:

[local_impala_repo]
name=Local Repository for Impala Packages
baseurl=file://<full_path_to_downloaded_packages>/
enabled=1
gpgcheck=0

  • Copy this configuration file to the location of yum configuration files – typically /etc/yum.repos.d
  • Trigger an install as follows:

yum –disablerepo=”*” –enablerepo=”local_impala_repo” install

The –disablerepo=”*” was required to prevent yum from using existing public repositories under /etc/yum.repos.d for the default packages. –enablerepo option enables the local repository set up above, making it the only source of packages available to yum.

Note that if the path to which the RPM files are copied is a path accessible from all machines – like an NFS mounted file system – you can download the files just once and use it from all machines.

Extracting files from a package

Impala requires Hadoop’s native library, libhadoop.so, for its functioning. The systems I was installing Impala on had CDH installed using tarballs which do not contain the native libraries. Hence, I needed to get these separately and install them.  Since these are native libraries, one needs to be careful to get the versions compatible with the system being used. The best way to do this is to build from source. If you are lazy like me, you may want to get these from a compatible RPM like the ones available here (for CentOS 6, 64-bit, CDH 4.5). The task becomes a little more interesting if some of the files provided by the RPM are already installed on the system (through other means). This being the case, I wanted to see if there was a way to extract files from the RPM without actually installing it, so that I could just pick what I wanted. Which led me to learn about cpio and rpm2cpio.

The cpio utility copies files between archives of a set of supported archive types. Like many standard Unix tools, it is also capable of reading archive files from standard input and write archives to standard output. One of the supported archive types is the ‘cpio’ format itself.

The rpm2cpio utility extracts files provided by the RPM into the cpio format that is written out to standard output. Composition is straightforward now:

rpm2cpio <name of rpm> | cpio -idm

This pipeline extracts all files provided by an RPM relative to the current working directory using the cpio format as an intermediate transformation format.  One can now easily pick the specific files required from the extracted contents.

In summary, the two tricks I learnt over the course of a day were immensely helpful in getting a working Impala installation on multiple machines. A reliable search engine, tools that follow Unix’s design principles of composition, configurability and usability, and immensely helpful documentation from unknown authors (in the form of blogs, howtos, etc.) who took the time to document these tidbits for the benefit of others made this possible and easy. Of these, I’d rate the documentation as the most important. And this post is hopefully an attempt to pay forward the gratitude.

Tagged with: , ,
Posted in Technical

Comparing Apache Tez and Microsoft Dryad

Hortonworks has been blogging about a framework called Tez, a general purpose data processing framework. Reading through the posts, I was reminded of a similar framework that had come from Microsoft Research a while back called Dryad. This blog post is an attempt at comparing them.

In order to structure the comparison, I am trying to express the points under the following topics: historical perspective, features, concepts, and architecture.

Historical Perspective

Both Tez and Dryad define distributed, data parallel computing frameworks that lay an emphasis on modelling data flow. A data processing ‘job’ in either is defined as a graph. The vertices of the graph represent computational processes, with the edges connecting them describing input they receive and output they send out from / to other computational vertices or data sources / sinks. Both systems attempt to provide an efficient execution environment for running these jobs, abstracting users away from needing to handle common distributed computing requirements such as communication, fault tolerance, etc.

At the time of its introduction, Dryad was possibly Microsoft’s view on how to build such a framework from ground up. In contrast to Hadoop, Dryad attempted even then to provide a framework that wasn’t restricted to just one model (MapReduce) of computation. Dryad was inspired by a variety of data processing systems including MPP databases, data parallel programs on GPUs, and MapReduce as well. It attempted to build a system that could express all these kinds of computation.

Tez was introduced as a generalisation of the MapReduce paradigm that had dominated Hadoop computation for several years. However, it seems to be inspired more by data flow frameworks like Dryad. It was enabled immensely by the separation of concerns brought to the Hadoop MapReduce layer in the form of Apache YARN, that separated cluster resource management from distributed job management, enabling more models than just MapReduce. A direct motivation for Tez was the Stinger initiative, launched to build a faster version of Apache Hive. Specifically, the idea was to enable expressing a HQL query as a single Tez job, rather than multiple MapReduce jobs, thereby avoiding the overhead of launching multiple jobs and also incurring the I/O overhead of having to store data between jobs on HDFS.

Features

Tez and Dryad share several features, such as:

  • The DAG model being the specification choice for a job
  • A flexible / pluggable system where the framework tries to give the user control of the computation, nature of input and output, etc.
  • Supporting multiple inputs and outputs for a vertex (that enable SQL like joins to be expressed, and various forms of data partitioning like the shuffle sort phase of Hadoop MapReduce)
  • An ability to modify the DAG at runtime based on feedback from executing part of the graph. The runtime modification is primarily used for improving the efficiency of execution in both systems. For e.g. in Dryad, this was used to introduce intermediate aggregator nodes (akin to the combiner concept in Hadoop MapReduce), while in Tez, this is being used as a way to optimise the number of reducers or when they would get launched.

Dryad was built from ground up without a supporting resource management or scheduling framework, and some of its ‘features’ are present in or shared by other layers of the Hadoop stack like YARN. In addition to those, Dryad allowed one specific optimisation through which processing nodes can execute concurrently, co-located and connected via  shared memory or pipes.

Tez on its hand, expands on learnings from the Hadoop MapReduce framework. For example, it expands on a feature available with MapReduce called JVM reuse, whereby ‘containers’ launched to run the vertex programs of Tez can be reused for multiple Tez tasks. It even allows sharing data between these tasks via an ‘Object Registry‘ without needing to have them run concurrently.

Concepts

Naturally, the core concepts of a Graph are common between the systems.

In Tez:

  • A vertex is defined by the input, output and processor associated with it.
  • The logical and physical manifestations of a graph are explicitly separated. Specifically, the inputs and outputs are of two types – a physical type and a logical type. The logical type describes the connection between a vertex pair as per the DAG definition, while the physical type will represent the connection between a vertex pair at runtime. The Tez framework automatically determines the number of physical instances of a vertex in a logical graph.
  • Edges are augmented with properties that relate to data movement (for e.g. multicast output between connected vertices), scheduling (co-schedule, or in sequence) and data source (persistence guarantees on the vertex’s output). Tez expects that by using a combination of these properties, one can replicate existing patterns of computation like MapReduce.
  • In addition to the graph concepts, there is also the concept of an ‘event’. Events are a means for the vertices and the framework to communicate amongst themselves. Events can be used to handle failures, learn about the runtime characteristics of the data or processing, or indicate the availability of data.

In Dryad:

  • Inputs and outputs are considered vertices just like processing vertices.
  • Dryad represents the logical representation of the DAG as a set of ‘stages’. However, this does not seem to be a first class concept to specify the DAG at definition time. Specifically, Dryad expects the specific number of instances of  a vertex at runtime to be defined at definition time.
  • A lot of operators are defined which help to build a graph. For instance:
    • Cloning: is an operation by which a given Vertex is replicated. Such a cloning operation is used to define a physical manifestation of a graph.
    • Composition: is used to define types of data movement patterns (akin to the edge property in Tez)  like round robin data transfer, scatter-gather etc.
    • Merge: is used for defining operations like fork/join etc.
    • Encapsulation: is a way of collapsing a graph into a single vertex, which makes it execute on a single node – used to express concurrent, co-located execution.
  • It appears the idea behind the operators is again to try and define patterns of computation like MapReduce.
  • A ‘channel’ is an abstraction of how data is transferred along an edge. There is support for different types of channels like File, Shared Memory, Pipes etc. This is similar to the physical Input/Output types in Tez.

Architecture

Tez is a YARN application. A Tez job is coordinated by the Tez Application Master (AM). It is comprised of Tez tasks. Each task encapsulates a processor (vertex) of the DAG and all inputs and outputs connected to it. A Tez task is launched within a YARN container. However, in the interest of providing good performance, a single YARN container could be reused for multiple Tez tasks. This is managed by a ‘TezTask’ host. The host also manages a store of objects that can be shared between Tez tasks that run within the host.

The Tez Application Master has a Vertex Manager plugin (that can be customised by the developer) for every type of Vertex. In addition, the AM also maintains a Vertex State Machine. As the state of the DAG changes, the Vertex Manager is invoked by the Application Master, who can then act on the State machine to customise the graph execution.

Another point to note is that Tez relies on YARN’s resource manager and scheduler for initial assignment of containers, etc. However, it has the ability to make the scheduling a two level activity. For example, Tez does come with scheduling capabilities, which it uses for features like container reuse.

Dryad’s architecture includes components that do resource management as well as the job management. A Dryad job is coordinated by a component called the Job Manager. Tasks of a job are executed on cluster machines by a Daemon process. Communication with the tasks from the job manager happens through the Daemon, which acts like a proxy. In Dryad, the scheduling decisions are local to an instance of the Dryad Job Manager – i.e. it is decentralised.

The logical plan for a Dryad DAG results in each vertex being placed in a ‘Stage’. The stages are managed by a ‘Stage manager’ component that is part of the job manager, similar to the Vertex Manager in Tez. The Stage manager is used to detect state transitions and implement optimisations like Hadoop’s speculative execution.

Conclusion

Dryad was discontinued by Microsoft in late 2011. Microsoft has since been contributing to Hadoop. Given the similarities between the two systems, a question is about how Tez’s prospects are going to be different from Dryad. A few points that seem to favour Tez, IMO:

  • Tez rides on years of learning from Hadoop MapReduce and other systems including Dryad. Microsoft recently posted that it contributes to Tez. The expectation then would be that the insights and learnings from systems (including what did not work) will help build a better system.
  • The separation of concerns brought about by YARN potentially helps Tez to focus on problems specific to the graph processing model, while delegating resource management and scheduling decisions to another layer – at least partially.
  • The API for Graph construction in Tez appears a lot simpler and intuitive to understand than the corresponding one in Dryad. Hence, it seems easier to adopt the model from a programmer perspective.
  • Given Tez was launched with a specific initiative of making Hive faster, there is a goal it is working towards, and there seems to already be evidence that Tez is enabling improvements in Hive as shown here.

Personally, I feel it would be good to have Tez succeed and several people who have invested in Hive will be able to see huge improvements in performance from their existing applications.

Acknowledgements

Most of the information for this post has come from the publicly available knowledge in blog posts and published paper. If there is any omission or mis-representation, please do let me know !

An initial draft of this post was reviewed by a few committers at Hortonworks: Siddharth Seth, Bikas Saha, Hitesh Shah and Vinod Kumar Vavilapalli. I am thankful to them for their feedback. While I have incorporated some of it, I felt some others are best explained from their end, possibly as comments. I will notify them once the blog is published.

Specifically calling out two points:

  • Both Sid and Hitesh have called out that there are going to be additional changes to the architecture and features in Tez that will soon be published. As this blog was being written, a new post came out from Hortonworks mentioning a new concept called Tez Sessions. So, be sure to watch out for Hortonworks blogs on Tez for more information.
  • Bikas provided feedback about Tez’s motivation being closer not just to systems like Dryad, but also other data flow systems like Hyracks and Nephele. It may be a good academic exercise to see these other systems as well from a perspective of learning.
Tagged with: , , , , ,
Posted in Hadoop, Technical

Using native Hadoop shell and UI on Amazon EMR

Amazon’s Elastic MapReduce (EMR) is a popular Hadoop on the cloud service. Using EMR, users can provision a Hadoop cluster on Amazon AWS resources and run jobs on them. EMR defines an abstraction called the ‘jobflow’ to submit jobs to a provisioned cluster. A jobflow contains a set of ‘steps’. Each step runs a MapReduce job, a Hive script, a shell executable, and so on. Users can track the status of the jobs from the Amazon EMR console.

Users who have used a static Hadoop cluster are used to the Hadoop CLI for submitting jobs and also viewing the Hadoop JobTracker and NameNode user interfaces for tracking activity on the cluster. It is possible to use EMR in this mode, and is documented in the extensive EMR documentation. This blog collates the information for using these interfaces into one place for such a usage mode, along with some experience notes. The examples given here have been tested on OS X MountainLion. The details will vary for other operating systems, but should be similar.

When a cluster is provisioned, a node called the ‘master’ node is created on the EMR cluster, that runs the JobTracker and the NameNode. In short, the mechanism to access the Hadoop CLI is to ssh into the master node and use the installed Hadoop software. Likewise, for accessing the UI, an SSH tunnel needs to be set up to the web interfaces that also run on the master node.

Before we start

  • This blog assumes you already have signed up for an Amazon account.
  • Next, you need the Amazon EMR CLI. The CLI is a useful way to access Amazon EMR that provides a good balance between completeness of functionality and ease of use, compared to the Amazon EMR console.

Note: The EMR CLI is a Ruby application requiring Ruby 1.8.x. If you have installed Ruby 1.9 on your system, you have two options:

    • Use RVM, install Ruby 1.8.x. via RVM and switch the Ruby version. 
    • Alternatively, you can use a fork of the Amazon CLI that works with Ruby 1.9 called elastic-mapreduce-ruby.
  • Configure the EMR client by creating a credentials.json file. The file should be created in the home directory of your elastic-mapreduce client. A sample file’s contents is mentioned here:
{
 "access_id": "XXXXXXXXXXXXXXXXX",
 "private_key": "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY",
 "keypair": "keypair-name",
 "key-pair-file": "/path/to/keypair-file.pem",
 }
  • In the example above, access_id and private_key are security credentials connected with your AWS account. The keypair is a resource associated to your Amazon EC2 service. The private key details of the keypair can be downloaded to a PEM file locally, whose path is specified in key-pair-file. You can refer to this link for details on how to get these various security parameters.

Launching a cluster

Once set up, we are now ready to launch an EMR cluster and access it to submit Hadoop jobs.

Typical deployments of a Hadoop cluster comprise of three types of nodes – the masters (JobTracker and NameNode), the slaves (TaskTrackers and DataNodes) and client nodes (typically called access nodes or gateways) from where users submit jobs. An EMR cluster too has three types of nodes – a single master node (that runs both the JobTracker and NameNode), core nodes (that run both TaskTrackers and DataNodes) and task nodes (that run only the TaskTrackers). As you can see, the categories of nodes are slightly different. But we could double up the master node in EMR to be a client node as well. Since the master node in an EMR cluster is used only by you, it is assumed it will have enough capacity to run the Hadoop CLI.

Start an EMR cluster with the CLI in this fashion:

./elastic-mapreduce --create  --alive --visible-to-all-users --num-instances 3 --slave-instance-type m1.small --master-instance-type m1.small --name blog-test

The ‘elastic-mapreduce’ program will be in the home directory of the EMR client. The above command creates a cluster with 3 instances – 1 master and 2 slaves. The –alive flag ensures that the launched cluster stays alive until it is manually terminated by the user. This option is required to login into the master node and submit Hadoop jobs to the cluster directly. The output of this command will be a ‘jobflow’ ID – something that will look like j-FTW6FLQ1P2G0. Make a note of this ID, as we will use it in other commands below.

Note: You will be charged according to the EMR rates for your usage of the cluster, depending on the type and number of instances chosen.

You can also get more details about the launched cluster using the following command:

./elastic-mapreduce --describe -j <jobflow-id>

Of specific interest in the output of the above command will be the “State” attribute of the ExecutionStatusDetail node. You need to wait until the value of this attribute becomes “WAITING”. Once this state is reached, your cluster is ready for further action. Similarly, the “MasterPublicDnsName” attribute gives the DNS name of the machine running the Hadoop JobTacker and NameNode.

Browsing Hadoop web UI on the cluster

At this point, it will be useful to check out how the cluster looks like from the familiar JobTracker and NameNode web UI.

The JobTracker web server runs on port 9100 on the master node. In order to browse this UI, you can set up an SSH tunnel that will work as a SOCKS proxy server from your machine to the master node that dynamically forwards HTTP requests. The Amazon CLI provides a command to set up this proxy:

./elastic-mapreduce -j <jobflow-id> --socks

Note: This command helpfully prints out the SSH command that is invoked to set up the SOCKS proxy server. When facing problems with connectivity, I have been able to take this SSH command, modify it (for instance to add the -v option to ssh for enabling debug output) and run it directly for debugging / resolving issues.

After the socks proxy server is started, you can configure your browser’s or system’s proxy settings to use this SOCKS proxy for HTTP traffic. For Chrome, I did this by launching Settings > Change Proxy Settings > Select “SOCKS Proxy” > Enter the server address as 127.0.0.1 and the port as 8157.

socks-proxy

If things are fine here, you should be able to hit the URL http://<MasterPublicDnsName&gt;:9100/ on the browser and see the JobTracker UI page and http://<MasterPublicDnsName&gt;:9101/ for the NameNode UI.

The method described here routes all HTTP traffic from your browser through the tunnel. A better option would be to set up a rule that routes only traffic to the EMR clusters through the tunnel. Such a method is described in the EMR documentation, using the FoxyProxy Firefox browser plugin.

Submitting Jobs

The next step is to submit jobs to the EMR cluster using the Hadoop CLI. As described above, the master node doubles up as a Hadoop client node as well. So, we should SSH into the master node using the following command:

./elastic-mapreduce -j <jobflow-id> --ssh

This command should drop you into the home directory of the ‘hadoop’ user on the master node. A quick listing of the home directory will show a full Hadoop installation, including bin and conf directories and all the jars that are part of the Hadoop distribution.

You can execute your Hadoop commands as usual from here. For e.g.

./bin/hadoop jar hadoop-examples.jar pi 10 100

Using the UI that we set up in the previous step, you can also browse the job pages as usual.

Terminating the cluster

Once you are done with the usage, you must remember to terminate the cluster, as otherwise you will continue to accrue cost on an hourly basis irrespective of usage. The command to terminate is:

./elastic-mapreduce --terminate -j <jobflow-id>

Note: Terminating the cluster will result in loss of data stored in the HDFS of the cluster. If you need to store data permanently, you can do so by providing Amazon S3 paths as output directories of your jobs.

In conclusion, tools like the elastic mapreduce CLI make it easy to not only provision a Hadoop cluster and run jobs on it, but also to provide the familiar look and feel of the Hadoop client and the Web UI.

Tagged with: ,
Posted in Cloud, Hadoop, Uncategorized

Elephants in the Clouds

Over the past one year, there have been a lot of new product / project announcements related to running Hadoop in Cloud environments. While Amazon’s Elastic MapReduce continued with enhancements over its base platform, players like Qubole, Mirantis, VMWare, Rackspace all announced product or service offerings marrying the elephant with the cloud. Projects like Apache Whirr, Netflix’s Genie started getting noticed. To me, it has become evident that Hadoop in the cloud is a trending topic. This post explores some of the reasons why this association makes sense, or why customers are seeing increased value in this model.

Why Hadoop on the Cloud makes sense

Usual Suspects

Running Hadoop on the cloud makes sense for similar reasons as running any other software offering on the cloud. For companies still testing waters with Hadoop, the low capacity investment in the cloud is a no-brainer. The cloud also makes sense for a quick, one time use case involving BigData computation. As early as in 2007, New York Times used the power of Amazon EC2 instances and Hadoop for just one day to do a one time conversion of TIFF documents to PDFs in a digitisation effort. Procuring scalable compute resources on demand is appealing as well.

Beating The Red Tape

The last point above of quick resource procurement needs some elaboration. Hadoop and platforms it was inspired from made the vision of linear storage and compute using commodity hardware a reality. Internet giants like Google, who always operated at web-scale, knew that there would be a need for running on more and more hardware resources. They invested in building this hardware themselves.

In the enterprises though, this was not necessarily an option. Hadoop adoption in some enterprises grew organically from a few departments running tens of nodes to a consolidated medium sized or large cluster with a few hundreds of nodes. Usually such clusters started getting managed by a ‘data platform’ team different from the Infrastructure team, the latter being responsible for procurement and management of the hardware in the data centre. As the analytics demand within enterprises grew, the need to expand the capacity of the Hadoop clusters also grew. The data platform teams started hitting a bottleneck of a different kind. While the software itself had proven its capability of handling linear scale, the time it took for hardware to materialise in the cluster due to IT policies varied from several weeks to several months, stifling innovation and growth. It became evident that ‘throwing more hardware at Hadoop’ wasn’t as easy or fast as it should be.

The cloud, with its promise of instant access to hardware resources, is very appealing to leaders who wish to provide a platform that scales fast to meet growing business needs.

Handling Irregular Workloads

Being a batch oriented system, typical usage patterns of Hadoop involve scheduled jobs processing new incoming data on a fixed, temporal  basis. The load on compute resources of a Hadoop cluster varies based on the timings of these scheduled runs or rate of incoming data.  A fixed capacity Hadoop cluster built on physical machines is always on whether it is used or not – consuming power, leased space, etc. and incurring cost.

The cloud, with its pay as you use model, is more efficient to handle such irregular workloads. Given predictability in the usage patterns, one can optimise even further by having clusters of suitable sizes available at the right time for jobs to run.

Handling Variable Resource Requirements

Not all Hadoop jobs are created equal. While some of them require more compute resources, some require more memory, and some others require a lot of I/O bandwidth. Usually, a physical Hadoop cluster is built of homogenous machines, usually large enough to handle the largest job.

The default Hadoop scheduler has no solution for managing this diversity in Hadoop jobs, causing sub-optimal results. For example, a job whose tasks require more memory than average could affect tasks of other jobs that run on the same slave node due to a drain on system resources. Advanced Hadoop schedulers like the Capacity Scheduler and Fairshare Scheduler have tried to address the case of managing heterogenous workloads on homogenous resources using sophisticated scheduling techniques. For e.g. the Capacity Scheduler supported the notion of ‘High RAM’ jobs – jobs that require more memory on average. Such support is becoming more prevalent with Apache YARN, where the notion of a resource is being more comprehensively defined and handled. However, these solutions are still not as widely adopted as the stable Hadoop 1.0 solutions that do not have this level of support.

Cloud solutions meanwhile already offer a choice to the end user to provision clusters with different types of machines for different types of workloads. Intuitively, this seems like a much easier solution for the problem of handling variable resource requirements.

Simplifying Multi-tenancy Requirements

As cluster consolidation happens in the enterprise, one thing that gets lost out is the isolation of resources for different sets of users. As all user jobs get bunched up in a shared cluster, administrators of the cluster start to deal with multi-tenancy issues like user jobs interfering with one another, varied security constraints etc.

The typical solution to this problem has been to enforce very restrictive cluster level policies / limits that prevent users from doing anything harmful to other users jobs. The problem with this approach is that valid use cases of users are also not solved. For instance, it is common for administrators to lockdown the amount of memory Hadoop tasks can run with. If a user genuinely requires more memory, he / she has no support from the system.

Using the cloud, one can provision different types of clusters with different characteristics and configurations, each suitable for a particular set of jobs. While this frees administrators from having to manage complicated policies for a multi-tenant environment, it enables users to use the right configuration for their jobs.

Running Closer to the Data

As businesses move their services to the cloud, it follows that data starts living on the cloud. And as analytics thrives on data, and typically large volumes of it, it makes no sense for analytical platforms to exist outside the cloud leading to inefficient, time consuming migration of this data from source to the analytics clusters.

Running Hadoop clusters in the same cloud environment is an obvious solution to this problem. This is, in a way, applying Hadoop’s principle of data locality at the macro level.

These are some of the reasons why Hadoop clusters on the cloud make sense, and how they open up new possibilities for platform providers, administrators and end users.

The other side of the coin

While the above are some good reasons for considering the cloud, it is not so that one can just flip over and start using the new environment. There are some specific issues that make running Hadoop jobs on the cloud more challenging than other common workloads. I hope to cover these in an upcoming blog post.

Tagged with: ,
Posted in Hadoop, Technical

Taking memory dumps of Hadoop tasks

On the Hadoop users mailing list, I was recently working with a user on a recurring problem faced by users of Hadoop. The user’s tasks were running out of memory and he wanted to know why. The typical approach to debug this problem is to use a diagnostic tool like jmap, or hook up a profiler to the running task. However, for most Hadoop users, this is not feasible. In real applications, the tasks run on a cluster to which users do not have login access and they cannot debug the task as it runs. I was aware of an option that is provided by the JVM – -XX:+HeapDumpOnOutOfMemoryError that can be passed to the task’s Java command line. This option makes the JVM to take a dump when the task runs out of memory. However, the dump is saved by default to the current working directory of the task, which in Hadoop’s case is a temporary scratch space managed by the framework and it would become inaccessible once the task completes.

At this point, when we were pretty much giving up on options, Koji Noguchi, an ex-colleague of mine and a brilliant systems engineer and Hadoop debugger, responded with a way out. You can read about it here. The few lines mentioned feel almost like magic, so I thought I would write an explanation about how it is working, for people who are interested.

The requirement is to save the dump to a location which is accessible by the user running the task. Such a location, in Hadoop’s case, is … HDFS. So, what we want is to be able to save the dump to HDFS when the scenario happens.

To get there, the first observation is that the JVM offers hooks that can be passed to the task’s command line, for us to act on an OutOfMemory scenario. The options Koji used are:

-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./heapdump.hprof -XX:OnOutOfMemoryError=./copy-dump.sh

These options instruct the JVM to take a dump on OutOfMemory, save it with a name of our choice, and importantly, run a custom script . It should be obvious now that the custom script can copy the generated dump to DFS, using regular DFS commands.

There are a couple of gaps to plug though – as the devil is in the details. How does the script (copy-dump.sh, in the example above) get to the cluster nodes ? For that, we use Hadoop’s feature – Distributed Cache. This feature allows arbitrary files to be packaged and distributed to cluster nodes where tasks require them – using HDFS as an intermediate store. The cached files are usually available to tasks via a Java API. However, since we need this outside the task’s context, we use a powerful option of the Distributed Cache – creating symlinks. This option not only makes the files available to the tasks via the API, but also creates a symbolic link of that file into the current working directory of the task. Hence, when we refer to the script in the task’s command line, we can refer to it relative to the current working directory of the task, i.e. ‘.’.

The specific options to set up all of the above are as follows:

conf.set("mapred.create.symlink", "yes");
conf.set("mapred.cache.files", "hdfs:///user/hemanty/scripts/copy_dump.sh#copy_dump.sh");
conf.set("mapred.child.java.opts","-Xmx200m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./heapdump.hprof -XX:OnOutOfMemoryError=./copy_dump.sh");

The last detail in the whole solution is about how to save the dump on HDFS with a name that is unique. Because, realize that multiple tasks are running together at the same time, and more than one of them could run out of memory. Koji’s scripting brilliance for solving this problem was to use the following script:

hadoop fs -put heapdump.hprof /tmp/heapdump_hemanty/${PWD//\//_}.hprof

The expression ${PWD//\//_} takes the current working directory of the task (from the environment), and replaces every occurrence of ‘/’ with an ‘_’. Nice !

So, using these options, features and diagnostics of Hadoop and the JVM, users can now get memory dumps of their tasks to locations that they can easily access and analyse. Thanks a lot to Koji for sharing this technique, and happy debugging !!!

Tagged with: ,
Posted in Hadoop, Technical

Hadoop filesystem shell subcommand variants

At a client location where I am working as a Hadoop consultant, an upgrade is being planned from a Hadoop 0.20 based version to a Hadoop 2.0 based version. Through these versions, there have been some changes to the hadoop fs subcommand and users have been asking about what the differences are among the variants. This post aims to answer some of those questions.

There are three variants that exist currently:

  • hadoop fs
  • hadoop dfs
  • hdfs dfs

All of these commands provide a CLI to the configured filesystem in Hadoop, and have options that mimic various file operations like ls, put, get, mv, cat, etc. To folks who have played around with all of these, the results would have appeared more or less the same, barring a deprecated warning message in some cases.

The first thing to note is that hadoop dfs was the original shell subcommand. The variant hadoop fs was introduced via HADOOP-824. As mentioned in the JIRA, the change was made to reflect the nature of Hadoop’s generic FileSystem API – which provides a service provider like interface that can be implemented by any filesystem implementation. Indeed, there are multiple filesystems supported by Hadoop – including the local filesystem, HDFS (Hadoop’s native distributed filesystem), Amazon S3 and so on. For some reason – most likely, backwards compatibility – the hadoop dfs subcommand was not deprecated nor removed as part of this change. Hence, from this point onwards, both the subcommands could be used to access the same underlying filesystem.

Sometime after Hadoop 0.20, the community made a decision to split the Hadoop project into three separate projects – common, hdfs and mapreduce. While this split itself was partially reverted due to various reasons, a few changes made as part of the split were retained in the codebase. Specifically, HADOOP-4868 split the single hadoop shell script into two other scripts, including hdfs. As part of this split, subcommands that seemed relevant to the distributed filesystem alone were moved to the hdfs script. The dfs subcommand was also moved there, although the fs subcommand was retained in the hadoop script itself. hadoop dfs was supported by calling out to the hdfs script, and printing a deprecation warning.

One notable point is that in order to execute any filesystem commands, the classpath needs to refer to the classes that implement the filesystem. In Hadoop, as of this writing, all filesystem implementations, other than HDFS, sit in the hadoop-common jar. The HDFS implementation, on the other hand, lives in its own jar. An important decision made by the community in HADOOP-4868 was to include the HDFS jars in the classpath of the hadoop script, so that the fs subcommand can interact with an underlying HDFS filesystem – which is the most typical installation of Hadoop. The hdfs script also shares its classpath configuration with the hadoop script.

What that really means is that as far as HDFS is concerned, hadoop fs and hdfs dfs are both properly supported usages without any difference. Even for the other file systems, currently the two variants behave the same way. However, it is not unimaginable that in future, other filesystem implementations get out of the hadoop-common jar, and the hdfs script then doesn’t  refer to other filesystem implementations, thereby making hdfs dfs an exclusive HDFS only subcommand.

My personal preference would be to use hadoop fs as it is more generic. However, it isn’t too wrong to use hdfs dfs if we are very sure that our Hadoop filesystem of choice isn’t going to change. hadoop dfs is best avoided, as it has already been deprecated.

Posted in api, Hadoop, Technical