Sorry, the language specified is not available for this page

    AWS Lambda Invasion in Big Data

    February 22, 2016

    AWS Lambda(Lambda) compute service, built to automatically scale applications or backend services based on event-driven programming, can play a major role in implementing--as well as in the re-engineering of big data solutions popularly associated with the Hadoop framework. Lambda is not a complete substitute for the evolving Hadoop framework, rather it can be a powerful alternative tool for some components in big data architecture. Popular and commonly implemented big data use cases can be classified under behavioral analytics, predictive analytics, customer sentiment analysis, customer segmentation and fraud detection areas. Social networks like Facebook, Google+, Twitter, and YouTube play a big role along with website and mobile application interactions. Typically, the solution can be divided into four stages--collect, store, process and analyze--with different AWS services and flows joining them together. Lambda can be used in all the four functions mentioned above. However it can play a significant role in the store and process functions combined with other components in the architecture. Here are some of the top features and benefits of why enterprises must consider Lambda in their big data solution architecture

    • Eliminating the need for the infrastructure for large-scale distributed processing, which is traditionally handled by the Master-Slave-Task instances of the Hadoop framework.
    • Application code with event handler code replacing the traditional map-reduce type of programming in a number of industry standard programming languages.
    • Ability to trigger events, including real-time streaming integrated with services such as Amazon DynamoDB, S3, Kinesis streams, SNS, and Cloudwatch.
    • On-demand service, scalability, elasticity, pay-per-use (for capex reduction), security, overall cost of processing and the ability to support the volume and velocity to infinite (almost!) capacity is what attracts many companies to adapt or try out the big data solutions in the AWS platform. Typically AWS big data solutions platform revolves around Amazon EMR (EMR). However Lambda by design is engineered to meet these features, as well.
    • AWS Lambda and Amazon Kinesis can be used to process real-time streaming data for application activity tracking, transaction order processing, click stream analysis, data cleansing, metrics generation, log filtering, indexing, social media analysis and IoT device data processing.
    • NoOps - Lambda functions can also be invoked on demand and all the compute capacity and resources needed is automatically managed by AWS by spinning the necessary infrastructure, managing the code and execution.

    A simple word count test was conducted using EMR, as well as Lambda, by using the inputs from Twitter. Some comparison information is documented below between the two approaches. Word Count Test Process the input file(s) in one S3 bucket and summarize the word and its count in another bucket. Also used another Lambda function to collect the data from twitter to retrieve the tweet output for the latest 1000 tweets on the status timeline and store it in the s3 bucket to be used as the input in this processing. Option 1: Simple Reference Architecture with AWS Lambda The following diagram depicts a simple use case of a social media or log file analysis in which the data text files are available in an S3 bucket as input and processed by Lambda functions, with the processed results being written to another S3 bucket. Typically, this can be done as a 2-step process similar to the EMR by having a Map-Scatter function to distribute inputs and a?Reduce-Gather function to process the inputs. The following code snippet provides an alternative by doing both in the same function.

    Lambda function-1: Collect data from twitter For this function select Lambda, then click new function and skip the blueprint to get to the configure screen. These are the fields that needs to be filled:

    <span style="font-weight: 400;">Name - load_from_twitter_status</span><span style="font-weight: 400;">Runtime – Node.js</span><span style="font-weight: 400;">Code entry type – Upload .ZIP file</span><span style="font-weight: 400;">Handler - &nbsp;&nbsp;index.handler</span><span style="font-weight: 400;">Role – You can create a new role “s3 execution role” or select an existing role. Iin both cases, ensure required or all s3 actions are available to create object in a S3 bucket.</span><span style="font-weight: 400;">Memory – Leave the default 128 MB.</span><span style="font-weight: 400;">Timeout – Keep sufficient time 0 minutes and 30 seconds. Or, 59 seconds can be a good start.</span>

    Node zip instructions: The zip file should be done inside the node project such that the index.js, package.json and the node_modules are at the same level among other files, as needed. Providing some snippets from index.js that can be useful in building this function.

    <span style="font-weight: 400;">//This is required for the handler to invoke the code with in this</span>
    <span style="font-weight: 400;">exports.handler = function(event, context) {</span><span style="font-weight: 400;">var AWS = require('aws-sdk');</span><span style="font-weight: 400;">// Obtain twitter application authorization details from twitter.com</span><span style="font-weight: 400;">var twit = new twitter({</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;    consumer_key: 'abc…………….’,</span><span style="font-weight: 400;">    consumer_secret: 'epE……………………………………………………….',</span><span style="font-weight: 400;">    access_token_key: '3216……-Qza…………………….',</span><span style="font-weight: 400;">    access_token_secret: 'jn……………………………………………………………….'</span><span style="font-weight: 400;">});</span>
    <span style="font-weight: 400;">doCall(twit, function(response){</span><span style="font-weight: 400;">&nbsp;</span> <span style="font-weight: 400;">//Convert to response type to String or other allowed data types for s3 put object</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;">var twitoutput = JSON.stringify(response);</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;">// Set name of the object that gets generated</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var s3key = “objectname…..”;</span>                    <span style="font-weight: 400;">var params = {Bucket: 'myBucket/input', Key: s3key, Body: twitoutput}</span><span style="font-weight: 400;">      &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;">s3bucket.putObject(params, function(err, data) </span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span><span style="font-weight: 400;">console.log("Entered put bucket function");</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;">if (err) {</span><span style="font-weight: 400;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span><span style="font-weight: 400;">console.log("Error uploading data: ", err);</span><span style="font-weight: 400;">                                          } else {</span><span style="font-weight: 400;">                                            console.log("Successfully uploaded data to bucket/sub-bucket/");</span><span style="font-weight: 400;">                    // context.done will allows for the s3 operation to be complete as context.succeed might end up </span> <span style="font-weight: 400;">                    // closing the function before the feedback is arrived from s3</span><span style="font-weight: 400;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span>                         <span style="font-weight: 400;">context.done();</span><span style="font-weight: 400;">&nbsp;</span> <span style="font-weight: 400;">}</span><span style="font-weight: 400;">                                           });</span><span style="font-weight: 400;">                                           });</span><span style="font-weight: 400;">                                     })</span>
    <span style="font-weight: 400;">function doCall(twit, callback) {</span><span style="font-weight: 400;">// call twitter API functions and return the required tweets</span><span style="font-weight: 400;">var params = {count: 1000};</span><span style="font-weight: 400;">twit.get('statuses/home_timeline', params, function(error, tweets, response){</span><span style="font-weight: 400;"> &nbsp;if (!error) { &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span><span style="font-weight: 400;"> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</span> <span style="font-weight: 400;">return callback(tweets);</span><span style="font-weight: 400;">             &nbsp;}</span><span style="font-weight: 400;">if(error) {</span><span style="font-weight: 400;"> &nbsp;var fail="failed";</span><span style="font-weight: 400;">  return callback(fail);</span><span style="font-weight: 400;">          }</span><span style="font-weight: 400;">});</span>

    Function execution time: REPORT RequestId: 0b93ed00-cc72-11e5-a52c-75fb3dd236ce Duration: 8848.41 ms Billed Duration: 8900 ms Memory Size: 128 MB Max Memory Used: 36 MB   Lambda function-2: Process data word count Click Lambda new function and choose the blueprint s3-get-object-python Choose the S3 bucket for input files and Event Type – Object created (All)

    <span style="font-weight: 400;">Name - load_from_twitter_status</span><span style="font-weight: 400;">Runtime – Python 2.7</span><span style="font-weight: 400;">Code entry type – Edit code inline</span><span style="font-weight: 400;">Handler - &nbsp;&nbsp;lambda_function.lambda_handler</span>Role:<span style="font-weight: 400;"> You can create a new role “s3 execution role” or select an existing role and in both cases ensure required or all s3 actions are available to create object in a S3 bucket.</span><span style="font-weight: 400;">Memory – Leave the default 128 MB</span><span style="font-weight: 400;">Timeout – Keep sufficient time 0 minutes and 30 seconds or 59 seconds can be a good startfrom __future__ import print_functionfrom time import strftimeimport jsonimport urllibimport boto3import res3 = boto3.client('s3')s3r = boto3.resource('s3')def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8') try: response = s3.get_object(Bucket=bucket, Key=key) data = response['Body'].read() pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") wordcount={} word_freq = {} for word in pattern.findall(data): lowerword = word.lower(); if lowerword not in wordcount: wordcount[lowerword] = 1 else: wordcount[lowerword] += 1 list = sorted(wordcount.keys()) for word in list: print ("%-10s %d" % (word, wordcount[word])) y="" for word in list: y=y+"{}t{}n".format(word, wordcount[word])  timekey = strftime("%Y-%m-%d %H:%M:%S") time = strftime("%H:%M:%S") s3r.Bucket('my-target-bucket-name').put_object(Key=timekey, Body=y) except Exception as e: print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e</span>

    Function execution time: REPORT RequestId: 10920cac-cdc0-11e5-aa64-e7de12986527 Duration: 1623.36 ms Billed Duration: 1700 ms Memory Size: 128 MB Max Memory Used: 31 MB   Option 2: EMR processing The following diagram depicts a simple use case of log file analysis in which the data text files are available in an Amazon S3 bucket as input and processed by an Amazon EMR cluster, with the processed data being written to another S3 bucket.

    n the AWS console Click EMR , click “Create Cluster”, click “Go to advanced options”, Leave the software configurations defaults as is and select streaming program in the add steps option and Click configure.

    Accept all defaults in the General options screen EC2 key pair: Accept the default or choose one of your keys

    Step execution details: (log file: controller will provide these details) 2016-02-06T03:01:44.084Z INFO Step created jobs: job_1454727481331_0001 2016-02-06T03:01:44.084Z INFO Step succeeded with exitCode 0 and took 66 seconds

    Tasks for: s-2EOXKPVVPA3TU, Job 1454727481331_0001

    Task summary: 23 total tasks - 23 completed(17 Map/6 Reduce), 0 running, 0 failed, 0 pending, 0 cancelled. Reference: http://aws.amazon.com/articles/Elastic-MapReduce/2273 Conclusion The table below summarizes the comparison from the two approaches:  

      EMR approach Lambda approach
    Storage S3 buckets for input and Output S3 buckets for input and Output
    Processing 3 EC2 instances m3.xlarge (1 master and 2 core instances)2 Security groups for Master and core1 Key for accessing the instances 1 Lambda function for processing the input and storing in the output.1 role for policy to allow to read objects from s3 and write objects to s3 and suggest the bucket name be prefixed to allow all s3 actions.
     Cost model Charges will be incurred for all the master and core instances in the EMR cluster. With Lambda, there is no administration cost and the charges are only associated with the consumed compute time when the code is running.
    Time for processing 66 seconds with 17 Map jobs and 6 reduce jobs Max: 2200ms (2 seconds)
    Cost comparison For the standard 3 instances 25% utilization in a month m3.xlarge is 192$ or m1.large 120$.For 2 instances it will be 2/3 of the cost. 10950 requests / Assuming 3000 ms with the memory of 128GB will cost less than a 1$ https://aws.amazon.com/lambda/pricing/

    Note: Typically, for huge volumes of time-sensitive real-time streaming data, tools like Amazon Kinesis can be used in combination with Lambda. Similarly for storage choices we can include S3 for files, Kinesis for streams and RDS Aurora and DynamoDB  for transactional data or Redshift for supporting analysis. With AWS Lambda service, and its unique features of Noops, serverless computing and continuous scaling can be disruptive in big data solutions. Depending on the use case, it could be very cost effective, as well. Therefore, it is worthwhile to consider Lambda during the architecture decision and design processes while architecting and designing big data use cases.

    Other Posts You Might Be Interested In

    AWS Lambda: Taking the development world by storm

    Lambda, AWS’ server-less compute service, launched at AWS re:Invent 2014, is popping up everywhere and revolutionizing how applications are architected. Major League Baseball Learn More

    Planning for failures with Amazon DynamoDB

    What happens when a cloud service that is specific to one geographic region fails? An AWS glitch called a “service event” happened on Sunday, September 20, 2015, from 2:19... Learn More

    Lambda Formation: Rocket fuel for AWS CloudFormation

    Since the launch announcement in late 2014, Lambda has become one of the most popular and fastest growing services on AWS.  With no infrastructure to worry about, there’s no... Learn More