Skip to main content

In-Memory MapReduce and Your Hadoop Ecosystem (Part 2)

Portions of this article were taken from the book High-Performance In-Memory Computing With Apache Ignite. If it got you interested, check out the rest of the book for more helpful information.
Before reading, be sure to check out Part 1!
Apache Ignite provides a vanilla distributed in-memory file system called Ignite File System (IGFS) with similar functionality to Hadoop HDFS. This is one of the unique features of Apache Ignite that helps accelerate Big Data computing. IGFS implements the Hadoop file system API and is designed to support Hadoop v1 and Yarn Hadoop v2. Ignite IGFS can transparently plug into Hadoop or Spark deployment.
One of the greatest benefits of the IGFS is that it does away with Hadoop NamedNode in the Hadoop deployment; it seamlessly utilizes Ignite’s in-memory database under the hood to provide completely automatic scaling and failover without any additional shared storage. IGFS uses memory instead of disk to produce a distributed, fault-tolerant, and high throughput file system. Removing NamedNode from the architecture leads to a dramatically better performance of I/O operations. Furthermore, IGFS provides native file system API to working with directories and files in the in-memory file system.
IgniteFileSystem, or the IGFS interface, provides methods for regular file system operations such as create, update, delete, mkdirs, etc., as well as MapReduce task executions. Another interesting feature of IGFS is its amazing smart usages of the file-level caching and eviction design. IGFS utilizes file-level caching to ensure corruption free storage.
Note that IGFS is not an alternative like RAM disk — it’s a fully compliant in-memory file system like HDFS. A high-level architecture of the IGFS is shown below in Figure 1.

In this article, we are going to cover basic operations of the IGFS and deploy the IGFS in standalone mode to store files into IGFS and performs a few MapReduce tasks on top of it.
Note: We are not going to replace the HDFS completely; otherwise, we would not be able to start the Hadoop dataNode anymore. We are going to use both IGFS and HDFS simultaneously.
From the bird’s eyes view, running MapReduce in IGFS on top of HDFS looks like as follows:
  1. Configure the IGFS for the Ignite nodes.
  2. Put files into IGFS.
  3. Configure the Hadoop.
  4. Run MapReduce.
There are a several ways to configure the IGFS on the Ignite cluster. Unfortunately, Apache Ignite doesn’t provide any comprehensive GUI-based management tools nor command line interface for maintaining Hadoop accelerator. However, GridGain Visor (Ignite commercial version) as a management tool provides IGFS monitoring and file management between HDFS, local and IGFS file systems. To demonstrate, how to use IGFS, we will perform the following steps:
  1. Configure the IGFS file system in the Ignite cluster (default-config.xml).
  2. Run a standalone Java application to ingest a file into IGFS. In our case, the file will be the t8.shakespeare.txt.
  3. Configure Hadoop.
  4. Run a MapReduce wordcount job to compute the count of the words from the IGFS file.
  5. Run a standalone Java application to check the result of the MapReduce job.
Now that, we have dipped our toes into the IGFS, let’s configure the standalone IGFS and run some MapReduce jobs on it.

Step 1

Add the following springs configuration beans into the default-config.xml file of the Ignite node as follows:
<bean id="igfsCfgBase" class="org.apache.ignite.configuration.FileSystemConfiguration" abs\
tract="true">
<property name="blockSize" value="#{128 * 1024}"/>
<property name="perNodeBatchSize" value="512"/>
<property name="perNodeParallelBatchCount" value="16"/>
<property name="prefetchBlocks" value="32"/>
</bean>
<bean id="dataCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="backups" value="0"/>
<property name="affinityMapper">
<bean class="org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper">
<constructor-arg value="512"/>
</bean>
</property>
</bean>
<bean id="metaCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="REPLICATED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
yVmIpFinder">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscover\
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="fileSystemConfiguration">
<list>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration" parent\
="igfsCfgBase"></bean>
<property name="name" value="igfs"/>
<property name="metaCacheName" value="igfs-meta"/>
<property name="dataCacheName" value="igfs-data"/>
<property name="blockSize" value="1024"/>
<property name="streamBufferSize" value="1024"/>
<property name="ipcEndpointConfiguration">
<bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration">
<property name="type" value="SHMEM"/>
<property name="host" value="127.0.0.1"/>
<property name="port" value="10500"/>
</bean>
</property>

Next, we have configured base cache configuration called dataCacheCfgBase, which will be the parent of the IGFS data cache. Most of the properties of this configuration we have already discussed. Note that for demonstration purposes, we have set the backup value to 0.
Our subsequent configuration is the base configuration for the metadata cache called meta- CacheCfgBase. It is probably the most unfamiliar part of this configuration. IGFS contains metadata for all files ingested into the in-memory file system. The configuration of this property is very similar to the previous base cache configuration.
Next, we are going to configure the IGFS file system, it is the main part of the Ignite configuration. We set the name of the IGFS file system to IGFS. The block size and the stream buffer size of the IGFS file system will be 1024. To let IGFS accept requests from Hadoop, an endpoint should be configured. Ignite offers two endpoint types:
  1. shmem: Working over shared memory (not available on Windows).
  2. tcp: Working over standard socket API.

Step 2

When each Ignite node is configured (default-config.xml), start every node with the following commands:
$ignite.sh

Step 3

In this step, we are going to ingest our t8.shakespeare.txt file into the IGFS file system. As we described before, we will use a Java application to ingest the file into IGFS. The application is very simple; it ingests the t8.shakespeare.txt file once every time the application is launched. The application will take the name of the directory and the filename as an input parameter to put the files into IGFS. Open the pom.xml file and add the following code in the dependency section.
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-hadoop</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
Now, add a new Java class with the name IngestFileInIGFS. The full listing of the Java class is shown below:
public class IngestFileInIGFS {
 private final static Logger LOGGER = LoggerFactory.getLogger(IngestFileInIGFS.class);
 private final static String IGFS_FS_NAME = "igfs";
 public static void main(String...args) {
  if (args.length < 2) {
   LOGGER.error("Usages [java -jar chapter-bigdata-1.0-SNAPSHOT.jar DIRECTORY_NAM\
E FILE NAME, for example java -jar chapter-bigdata-1.0-SNAPSHOT.jar myDir myFile]");
   System.exit(0);
  }
  Ignite ignite = Ignition.start("default-config.xml");
  Ignition.setClientMode(true);
  Collection < IgniteFileSystem > fs = ignite.fileSystems();
  for (Iterator ite = fs.iterator(); ite.hasNext();) {
   IgniteFileSystem igniteFileSystem = (IgniteFileSystem) ite.next();
   LOGGER.info("IGFS File System name:" + igniteFileSystem.name());
  }
  IgniteFileSystem igfs = ignite.fileSystem(IGFS_FS_NAME); // Create directory.
  IgfsPath dir = new IgfsPath("/" + args[0]);
  igfs.mkdirs(dir);
  // Create file and write some data to it.
  IgfsPath file = new IgfsPath(dir, args[1]);
  // Read the File Shakespeare
  InputStream inputStream = IngestFileInIGFS.class.getClassLoader().getResourceAsStr\
  eam("t8.shakespeare.txt");
 }
}
byte[] filesToByte;
try {
 filesToByte = ByteStreams.toByteArray(inputStream);
 OutputStream out = igfs.create(file, true);
 out.write(filesToByte);
 out.close();
} catch (IOException e) {
 LOGGER.error(e.getMessage());
} finally {
 try {
  inputStream.close();
 } catch (IOException e) {
  LOGGER.error(e.getMessage());
 }
}
LOGGER.info("Created file path:" + file.toString());
To compile and run the application, execute the following command:


mvn clean install
java -jar ./ IngestFileInIGFS.jar myDir myFile
After successfully compiling the Maven project, there will be Java executable JAR files in the target folder. The IngestFileInIGFS.jar file is for ingesting file into IGFS.

Step 4

It’s time for configuring Hadoop (the IGFS file system must be configured in Hadoop).
Let’s create a new directory under HADOOP_HOME/etc with the following command and copy all the files from the Hadoop directory. Execute the following command from the $HADOOP_HOME/etc directory.
cd $HADOOP_HOME/etc
$ mkdir hadoop-ignite
$ cp ./hadoop/*.* ./hadoop-ignite
Remove all the properties from the $HADOOP_HOME/etc/hadoop-ignite/core-site.xml and add the following properties as follows:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>igfs:///igfs@127.0.0.1:10500/</value>
</property>
<property>
<name>fs.igfs.impl</name>
<value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value>
</property>
</configuration>
The full qualified file system class name org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem is sufficient for configuring the IGFS for Hadoop.
Note: v1 or v2 doesn’t stand for Hadoop 1.x and Hadoop 2.x. Instead, this is about either old FileSystem API or new AbstractFileSystem API.
At this moment Hadoop configuration has been completed, and we are ready to execute Map/Reduce jobs.

Step 5

There are several ways to execute MapReduce jobs with Hadoop configuration. One of the easiest ways is to pass the Hadoop config directory as an input parameter to the job as follows:
hadoop --config [path_to_config] [arguments]

Let’s run our wordcount MapReduce job with the file from the IGFS with the following command:

time hadoop --config /home/user/hadoop/hadoop-2.7.2/etc/hadoop-ignite jar $HADOOP_HOME/sha\
re/hadoop/mapreduce/hadoop-mapredu
ce-examples-2.7.2.jar wordcount /myDir/myFile /myDir/out

After running the above statement, you should get the similar output in your terminal as shown below.


Note that you have to change the name of the output directory every time you run the MapReduce job.
Such a simple way, you can replace the Hadoop HDFS with IGFS.

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and