Sunday

An impatient start with Cascading

Last couple of years when i worked with Hadoop, in many blogs and conferences i have heard about Cascading framework on top of Hadoop to implements ETL. But in our lean start up project we decided to used Pig and we implemented our data flow based on Pig. Recently i have got a new book from O'Reilly Media "Enterprise Data Workflows with Cascading" and finished two interesting chapter with one breath. This weekend i have managed a couple of hours to make some try with examples from the Book. Author Paco Nathan very nicely explains why and when you should use Cascading instead of PIg or Hive, even more he gives examples to try at home. My today's blog is to my first expression on Cascading.

All the examples of the book could be found from the git hub. I have cloned the project from the Git hub and ready to run the examples. Project Impatient compiles and build with Gradle. I have run gradle clean jar and stacked with the following errors:
Could not resolve all dependencies for configuration ':providedCompile'.
> Could not download artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar'
   > Artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar' not found.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
I am not a Gradle geek and didn't find a quick fix for that and decided to use Maven for build and compile the examples (However, it will be great if some body point me how to fix). Further more author doesn't describe how to install standalone local Hadoop, certainly installing hadoop is out of scope of the Book. However we have to install local stand alone hadoop to run all the examples. Certainly there are a lot of blogs, articles could be found where briefly describe how to install Hadoop. As far as it's a impatient start we will install hadoop very quickly as possible.
1) Download the hadoop distribution hadoop-1.0.3.zip
2) Unzip the hadoop distribution in any folder.
3) Add the HADOOP_HOME to the env path.
actually we are ready to go but it's better to check three files in conf folder: core-site.xml, hdfs-site.xml, mapred-site.xml. All the three files should not have any configuration for local use as follows:
<configuration/>
4) Add the following pom.xml file in the directory part1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.blu.bigdata</groupId>
  <artifactId>casscading</artifactId>
  <version>1.0</version>
  <packaging>jar</packaging>

  <name>casscading</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-core</artifactId>
   <version>2.1.2</version>
</dependency>

<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-local</artifactId>
   <version>2.1.2</version>
</dependency>

<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-hadoop</artifactId>
   <version>2.1.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.1.2</version>
</dependency>

  </dependencies>
  <repositories>
<repository>
   <id>conjars.org</id>
   <url>http://conjars.org/repo</url>
</repository>
 </repositories>
    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>impatient.Main</mainClass>
          
                        </manifest>
                    </archive>

                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
5) run mvn clean install, it should create a fat jar file casscading-1.0-jar-with-dependencies.jar
6) Now we are ready to run the example part1
rm -rf output
hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar data/rain.txt ./output/rain
I have got the following out put from the console
13/08/11 20:53:11 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
13/08/11 20:53:11 INFO planner.HadoopPlanner: using application jar: /Users/samim/Development/NoSQL/casscading/./target/casscading-1.0-jar-with-dependencies.jar
13/08/11 20:53:11 INFO property.AppProps: using app.id: 96618C129BE0C7C591007BCFB650BAE6
2013-08-11 20:53:11.650 java[2161:1903] Unable to load realm info from SCDynamicStore
13/08/11 20:53:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/08/11 20:53:12 WARN snappy.LoadSnappy: Snappy native library not loaded
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:12 INFO util.Version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:12 INFO flow.Flow: [] starting
13/08/11 20:53:12 INFO flow.Flow: []  source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:12 INFO flow.Flow: []  sink: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:12 INFO flow.Flow: []  parallel execution is enabled: false
13/08/11 20:53:12 INFO flow.Flow: []  starting jobs: 1
13/08/11 20:53:12 INFO flow.Flow: []  allocating threads: 1
13/08/11 20:53:12 INFO flow.FlowStep: [] at least one sink does not exist
13/08/11 20:53:12 INFO flow.FlowStep: [] source modification date at: Sun Aug 11 18:50:44 MSK 2013
13/08/11 20:53:12 INFO flow.FlowStep: [] starting step: (1/1) output/rain
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:13 INFO flow.FlowStep: [] submitted hadoop job: job_local_0001
13/08/11 20:53:13 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
13/08/11 20:53:13 INFO io.MultiInputSplit: current split input path: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt
13/08/11 20:53:13 INFO mapred.MapTask: numReduceTasks: 0
13/08/11 20:53:13 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:13 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/08/11 20:53:13 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:13 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:13 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/08/11 20:53:13 INFO mapred.LocalJobRunner: 
13/08/11 20:53:13 INFO mapred.Task: Task attempt_local_0001_m_000000_0 is allowed to commit now
13/08/11 20:53:13 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/Users/samim/Development/NoSQL/casscading/output/rain
13/08/11 20:53:16 INFO mapred.LocalJobRunner: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt:0+510
13/08/11 20:53:16 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/08/11 20:53:18 INFO util.Hadoop18TapUtil: deleting temp path output/rain/_temporary
Shamim-2:casscading samim$ pwd
7) To view the results:
cat ./output/rain/*
here is the out put
doc_id text
doc01 A rain shadow is a dry area on the lee back side of a mountainous area.
doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.
doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.
doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.
doc05 Two Women. Secrets. A Broken Land. [DVD Australia]
Shamim-2:casscading samim$ hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar  data/rain.txt ./output/wc
Seems every thing works fine, i also run the Word count example.
Actually i am interested to manipulate Cassandra data through Cascading, also Lingual. I already found a few git hub projects which implemented Cassandra Tap and Sink for easy integration, thank you guys for nice work.