Thursday, August 18, 2016

Quick 1..2 on how to create an EMR cluster with a custom boostrap action (BA)

This is more a reference for me than for those reading this. I know I'm going to have to do it again in the future and I really don't want to run into the problems I had in getting this to work.

So, the AWS cli command to create the cluster:

myBA='--bootstrap-action Path="s3://< my-s3-bucket >/shell/install_profile.sh"'; aws emr create-cluster --release-label emr-5.0.0 --name testBA-10 --applications {Name="Hive",Name="Spark",Name="Zeppelin",Name="Ganglia"} --ec2-attributes my-Keypair --region us-east-1 --use-default-roles --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=1,InstanceType=m3.xlarge ${myBA}

The environmental variable myBA is the path to the S3 bucket that stores my script for the BA. It's worth noting that the script should exit with a 0 exit value if you don't want your BA to fail. Here's my script: #!/usr/bin/env bash set -x set -e HADOOP_HOME=/home/hadoop # My profile aws s3 cp s3://< my S3 bucket >/AWSStuff/std/myfuncs ${HADOOP_HOME} echo "About to modify my .bashrc login" echo -e "\n. ~/myfuncs\ngetNewProfile\n. .hbw" >> ${HADOOP_HOME}/.bashrc # Install byobu sudo /usr/bin/yum -y --enablerepo=epel install byobu # And we're ready to rock exit 0 Bingo. We're ready to roll.

Sunday, July 17, 2016

s3-dist-cp and regex's and the groupBy switch


I spent an inordinate amount of time trying to work out how s3-dist-cp [and more specifically, the  groupBy flag] works, because you would think that the reference to the Wikipeda link on regex's would mean that the Perl compatible regular expressions would work.

Not so fast there Tonto.....

It took me a good deal of experimenting to find this out. Hopefully this blog post helps you save some time....Please post your saved time to me at the address at the bottom of this post!

test-split/split-1/full
                  /full.1
                  /full.10
                  /full.11
                  /full.12
test-split/split-2/full.14
                  /full.15
                  /full.16
                  /full.17
                  /full.18
test-split/split-3/full.2
                  /full.20
                  /full.21
                  /full.22
                  /full.23
test-split/split-4/full.25
                  /full.26
                  /full.27
                  /full.28
                  /full.3
test-split/split-5/full.5
                  /full.6
                  /full.7
                  /full.8
                  /full.9

Operators within the '()' will be 'saved' for the output.

I tried this regex:
--groupBy '(full).1[5-7]'

expecting that it would groupBy
full.15, full.16 and full.17

Nope, it didn't work until I did this:
--groupBy '.*(full).1[5-7]*'

So it seems the regex is looking for anything (.*) prior to 'full' too - the full path!

The --groupBy is not a RE as I was expecting it to be!

Once figuring out that rexex's are not really regex's, I tried this:

s3-dist-cp --src s3://s3-lab/test-split/ --srcPattern='.*split-[12].*' --dest hdfs:///user/hadoop/PBXfull/ --groupBy ".*(full).1[0-9]*"

In the above, I expected s3-dist-cp would use a source pattern of .*split-[12].* for the directories and then the groupBy to select the files WITHIN those directories. Something was wrong though, because output ended up [seemingly] randomly in split-2 or split-1. I renamed the directories to foo, baz, bar, ipsum, lorum as this was my use-case.....

test-split/foo/full
                  /full.1
                  /full.10
                  /full.11
                  /full.12
test-split/bar/full.14
                  /full.15
                  /full.16
                  /full.17
                  /full.18
test-split/baz/full.2
                  /full.20
                  /full.21
                  /full.22
                  /full.23
test-split/ipsum/full.25
                  /full.26
                  /full.27
                  /full.28
                  /full.3
test-split/lorum/full.5
                  /full.6
                  /full.7
                  /full.8
                  /full.9

Then I ran this:

s3-dist-cp --src s3://s3-lab/test-split/ --srcPattern='.*(foo|bar|baz).*' --dest hdfs:///user/hadoop/PBXfull/s2/ --groupBy ".*(full).1[0-9]*"
     
This DIDN'T do what I expected (I expected it to grab foo, baz and bar, sifting out only the files full.10, full.11, etc).

So I tried this instead:

s3-dist-cp --src s3://s3-lab/test-split/ --srcPattern='.*baz.*' --dest hdfs:///user/hadoop/PBXfull/s2/ --groupBy ".*(full).1[0-9]*"

And now it worked. The main problem is that the srcPattern means that the output is in PBXfull/s2/baz while in the (foo|bar|baz) it put all the output into PBXfull/s2/foo....

But it doesn't end there....
What I would really like is that all the files in foo be concatenated, then all the files in baz, then all the files in bar, etc. Not one large file.

Thanks for Barbaros who got the solution and I'm documenting it here.

So in trying to work this out, I tried:

   s3-dist-cp --src=s3://s3-lab/test-split/foo/ --dest=hdfs:///user/hadoop/PBXfull/ --groupBy=".*(full)\..*"

This joined the files in test-split/foo together into a file 'full', but clearly this is going to break if I try to join the files in bar, bar, etc...

Let's test a couple of options:

   s3-dist-cp --src=s3://s3-lab/test-split/ --dest=hdfs:///user/hadoop/PBXfull/ --groupBy=".*(full)\..*"

Yup. It creates a file (full) combining all the files and outputs it to one (presumably the last) directory (in my case, I still have split-5 knocking around).

   s3-dist-cp --src=s3://s3-lab/test-split/ --srcPattern=".*bar.*" --dest=hdfs:///user/hadoop/PBXfull/ --groupBy=".*(full)\..*"

Yup. This does what I want. bar is concatenated into a file full in PBXfull/bar/full, but there's still a problem. The main one is that I have 10000 of these foo, bar, baz directories and I really want evey one in it's own directory, so I can't specify the srcPattern every time. It'll drive me crazy....crazier...

In the end, I tested Barebaros' solution:

   s3-dist-cp --src=s3://s3-lab/test-split/ --dest=hdfs:///user/hadoop/PBXfull/ --groupBy=".*/(\\w+)/.*"

The groupBy here won't group by the filename full.*, but WILL group by the diretory foo, bar, baz, etc....i.e. all the full.* files will be concatenated and grouped by the directory.
 
This produced:
   PBXfull/foo/foo [note the filename is now foo, not full]
   PBXfull/bar/bar [again, the filename is bar, not full]
etc.

The groupBy here is grouping by the directory and not the filename!

Wow! So nothing is quite what it appears to be.

A final example:
  s3-dist-cp --src=s3://s3-lab/test-split/ --dest=hdfs:///user/hadoop/PBXfull/ --groupBy=".*/(split)\-([0-9])/.*(full)\.1[0-9]*"

This, in theory groups by the split-1, split-2, etc. Or not. The output is:

   PBXfull/split-2/split2full

And this makes sense because:
   the first part of the groupBy (split)\-([0-9])
   will consider the split-1, split-2, etc.
And the second part .*(full)\.1[0-9]*
  will only consider files:
     full.10, full.11, full.12, etc.

And there are only 2 directories with these files in them, namely split-1, and split-2. Which means the last 'split-' will be where the output of the files goes - hence split-2/split2full.

Ok. Don't worry to send me the time you spent reading this post :-), but hopefully it helps someone (perhaps me even) sometime in the future.

Wednesday, May 25, 2016

Loading CSV data into DynamoDB using Hive on EMR

To complete this recipe, you will need:

* An AWS account and an EMR cluster with Hive installed (it's the default when launching a cluster 4.6 EMR cluster)
* Some data in CSV format that you can load into an S3 bucket (2)
* A DynamoDB table into which you plan to load this data (note, you need not specify all the column names - just the primary key and perhaps extra hash keys)

The data I'm going to use is a set of data that looks as follows:
   id; fortune; randomNumber

I generated this using the fortune(1) program on Linux. The random number from $RANDOM in bash and of course a count.

   1;Q:Why did the germ cross the microscope? A:To get to the other slide.;22913
   2;Do not sleep in a eucalyptus tree tonight.;2448
   ...

The general gist of this is to load the data into S3, then create a meta-table in Hive that will allow Hive to "select" through this meta-table from S3 and "insert" into the DynamoDB table.

Upload the data to s3 using the AWS cli:

 $ aws s3 cp fortunes.csv s3://*my-s3-bucket*/mydata/
  
Once you've created the EMR cluster, log into the master node using ssh:

  ssh hadoop@ec2-52-3-247-254.compute-1.amazonaws.com

Now it's time to create your meta-table in Hive for the S3 data. Type

  $ hive

(to get into the hive prompt)
 
  hive> CREATE EXTERNAL TABLE fortunes_from_s3 (id bigint, fortune string, randomNum bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\;" LOCATION 's3://*my-s3-bucket*/mydata';  

At this point, you should be able to "select" from this meta-table. This is a good test:
  select * from fortunes_from_s3;

If you get results that you expect from your data in S3, then we're half-way there.

Now it's time to create an external meta-table (hive_test_table) in DynamoDB to load the data.

  hive> CREATE EXTERNAL TABLE hive_test_table (ID bigint, Fortune string, RandomNum bigint) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "", "dynamodb.column.mapping" = "ID:id,Fortune:fortune,RandomNum:randomnum"); (1)


Note that one needs to map the fields, in this case:
  ID maps to id (in DDB), Fortune maps to fortune (in DDB), etc., where ID is the Hive meta-table column, Fortune is the Hive meta-table column.


We've got to the last stage to do at this point; loading the data into DynamoDB. To do this, one "inserts" from the "select":

  hive> INSERT OVERWRITE TABLE 'hive_test_table' SELECT * FROM 'fortunes_from_s3';

There will be a load of stuff that'll fly by as it loads the data:

  Query ID = hadoop_20160525155656_2ad0b238-f350-4805-8d8a-ebe9646f276c
  Total jobs = 1
  Launching Job 1 out of 1
  Number of reduce tasks is set to 0 since there's no reduce operator
  Starting Job = job_1463217542741_0016, Tracking URL = http://ip-192-168-0-122.ec2.internal:20888/proxy/application_1463217542741_0016/
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1463217542741_0016
  Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
  2016-05-25 15:56:34,246 Stage-0 map = 0%,  reduce = 0%
  2016-05-25 15:56:51,858 Stage-0 map = 100%,  reduce = 0%, Cumulative CPU 4.31 sec
  MapReduce Total cumulative CPU time: 4 seconds 310 msec
  Ended Job = job_1463217542741_0016
  MapReduce Jobs Launched:
  Stage-Stage-0: Map: 1   Cumulative CPU: 4.31 sec   HDFS Read: 200 HDFS Write: 0   SUCCESS
  Total MapReduce CPU Time Spent: 4 seconds 310 msec
  OK
  Time taken: 32.273 seconds



It's time to look in DynamoDB to check whether the data are loaded. If they're there, then the job is done.

Permissions

IAM may well stop you doing this job. In order to make sure the data can be loaded, create a new role in IAM. This role should have, at minimum the following Policies attached:

  AmazonElasiticMapReduceRole
  AmazonElasiticMapReducefor EC2Role


This will ensure that EMR can get the data from S3. Attach this role to the DynamoDB under Access Control policies for the table.

Gothas

(1) The dynamodb.column.mapping can catch one out. The ID:id,.... should not have space between the various column mappings (i.e. after the "," and before the next column mapping) 
(2) make sure that the CSV is the only thing in the bucket and don't specify the actual file name. Hive will try to read the file directly from the bucket