Operationalizing Data Science Models on the Pivotal Stack

August 11, 2016

 

Posted on by 

Srivatsan Ramanujam 

Categories:   Data Science    Greenplum    SCDF    PCF    GemFire    
Edit this post on GitHub.

(Joint work by Srivatsan RamanujamRegunathan RadhakrishnanJin YuKaushik Das )

At Pivotal Data Science, our primary charter is to help our customers derive value from their data assets, be it in the reduction of cost or by increasing revenue by offering better products and services. While we are not working on customer engagements, we engage in R&D using our wide array of products. For instance, we may contribute a new module to PDLTools or MADlib - our distributed in-database machine learning libraries, we might build end-to-end demos such as these or experiment with new technology and blog about themhere.

Last quarter, we set out to explore data science microservices for operationalizing our models for real-time scoring. Microservices have been the most talked about topic in many Cloud conferences of late. They’ve gained a large fan following by application developers, solution architects, data scientists and engineers alike. For a primer on microservices, we encourage you to checkout the free e-book from Matt Stine on “Migrating to Cloud Native Application Architectures”. For web scale companies like Facebook, Google, Amazon or Netflix, building and deploying data science models might be second nature thanks to years of R&D investment in their technology stack. However, enterprises embracing open source platforms like Cloud Foundry and big data processing environments like Greenplum,HAWQ or Geode need quick wins to demonstrate business value through easily deployable data science model training and scoring pipelines. We set out to explore how easy it was to build two such model training and scoring pipelines using open source toolkits from the Pivotal technology stack including Pivotal CF and Pivotal BDS.

Objective

In this blog, we describe the data science microservices we built for real-time scoring and link you to all our relevant code so that you can build it out by yourself on your own PCF and BDS environments. Our goal was to describe a reference architecture for operationalizing some of our data science models and demonstrate a proof-of-concept by building it out. These by no means should be considered to be production ready models nor are the microservices necessarily 12-Factor compliant. They serve to illustrate the process of deploying data science models on a PaaS like PCF.

Data science models

Text analytics is a common theme in many of our customer engagements. We’ve applied text analytics and NLP techniques to predict churn, understand customer satisfaction or to distill hundreds of thousands of call center transcriptions into higher level topics. We described some of these techniques by applying it to a fun problem of analyzing the tweets for NY primaries earlier this year (Data Science 101: Using Election Tweets to Understand Text Analytics).

For this work, we used sentiment scoring model for tweets and the doc2vecword2vec models for generating topically related tweets using a KNN algorithm. We used the implementations in gensim for training our doc2vec model for obtaining vector representation of tweets. This notebook gives a more detailed walk through of training these models.

Our goal is to demonstrate how to convert the scoring functions of the sentiment analyzer and the tweet recommender into microservices and invoke them through a stream processing engine like Spring Cloud Dataflow. All the components of the scoring functions can be deployed on CloudFoundry to reap the benefits of a Cloud Native architecture that Matt describes in his book.

Reference Architecture

We usually spend between 6-12 weeks on customer engagements. Once a business problem has been identified and all relevant data sources have been loaded into a data lake, we proceed to build statistical models using big data toolkits like MADlibPDLTools and procedural languages like PL/Python and PL/R. We use Jupyter notebooks extensively, with all the heavy lifting happening on a cluster in the backend and all exploratory data analysis and plotting happening on the client. Jupyter notebooks make the process of knowledge transfer and hand-off to our customers effective. Since most of a data scientist’s time is spent in building, monitoring and updating models, there should be a simple mechanism for deploying these models for scoring. The LinkedIn engineering team talked about this “last mile” problem in their SIGMOD’ 13 paper in greater detail. We wrote a Jupyter magic command to automate model deployment. Our scoring engine is a collection of microservices built using SpringBoot. We use Spring Cloud DataFlow as our stream processing engine and GemFire as our in-memory data grid to cache the data from our analytic data warehouse (Greenplum) and perform lightweight in-memory analytics. All the components of our scoring pipeline run within CloudFoundry. The analytics datawarehouse (Greenplum) can be within or outside of CloudFoundry, as the models can be trained and iterated upon offline.

The input to our scoring pipeline is a stream of live tweets, buffered using RabbitMQ (can be substituted with Kafka). Our pipeline compute a sentiment score for these tweets and also identifies the most similar tweets from a historical database of tweets.

Environment

We spun up a PCF environment on vSphere and through the OpsManager we installed GemFire for PCFRedis for PCFRabbitMQ for PCF, all downloadable from PivNet. To install and instantiate GemFire, you’ll also need to install the GemFire CLI for CF plugin. Detailed instructions on setting up your PCF environment can be found below. Our Greenplum datawarehouse (GPDB) was a single node instance running in a Linux box, that was in the same network as our PCF instance.

Initialization of PCF CLI

Login as admin to the “pds” org, choose the default space as “dsmicro"

sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf login --skip-ssl-validation  -a https://api.run.haas-23.pez.pivotal.io
API endpoint: https://api.run.haas-23.pez.pivotal.io

Email> admin

Password> 
Authenticating...
Credentials were rejected, please try again.

Password> 
Authenticating...
OK

Select an org (or press enter to skip):
1. system
2. pds

Org> 2
Targeted org pds

Select a space (or press enter to skip):
1. development
2. dsmicroservices

Space> 2
Targeted space dsmicroservices


API endpoint:   https://api.run.haas-23.pez.pivotal.io (API version: 2.43.0)   
User:           admin   
Org:            pds   
Space:          dsmicroservices   

Create new users and assign them appropriate roles in the new space

sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf create-user sramanujam@pivotal.io "****"
Creating user sramanujam@pivotal.io...
OK

TIP: Assign roles with 'cf set-org-role' and 'cf set-space-role'
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf set-org-role sramanujam@pivotal.io  pds  OrgManager
Assigning role OrgManager to user sramanujam@pivotal.io in org pds as admin...
OK
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf set-space-role sramanujam@pivotal.io  pds  dsmicroservices SpaceDeveloper
Assigning role SpaceDeveloper to user sramanujam@pivotal.io in org pds / space dsmicroservices as admin...
OK
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒

cf create-user rradhakrishnan@pivotal.io "****"
cf set-org-role rradhakrishnan@pivotal.io  pds  OrgManager
cf set-space-role rradhakrishnan@pivotal.io  pds  dsmicroservices SpaceDeveloper

cf create-user jyu@pivotal.io "****"
cf set-org-role jyu@pivotal.io  pds  OrgManager
cf set-space-role jyu@pivotal.io  pds  dsmicroservices SpaceDeveloper

Checkout the marketplace, instantiate GemFire service

⇒  cf marketplace
Getting services from marketplace in org pds / space dsmicroservices as admin...
OK

service          plans                                                           description   
app-autoscaler   bronze, gold                                                    Scales bound applications in response to load   
p-gemfire        GemFireServicePlan1, GemFireServicePlan2, GemFireServicePlan3   Pivotal GemFire offers the ability to deploy a GemFire cluster as a service in Pivotal CF.   
p-rabbitmq       standard                                                        RabbitMQ is a robust and scalable high-performance multi-protocol messaging broker.   
p-redis          shared-vm, dedicated-vm                                         Redis service to provide a key-value store   

TIP:  Use 'cf marketplace -s SERVICE' to view descriptions of individual plans of a given service.
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf marketplace -s p-gemfire
Getting service plan information for service p-gemfire as admin...
OK

service plan          description               free or paid   
GemFireServicePlan1   GemFire plan 1 instance   free   
GemFireServicePlan2   GemFire plan 2 instance   free   
GemFireServicePlan3   GemFire plan 3 instance   free   
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf create-service p-gemfire GemFireServicePlan1  dsmicro_gemfire
Creating service instance dsmicro_gemfire in org pds / space dsmicroservices as admin...
OK

Create in progress. Use 'cf services' or 'cf service dsmicro_gemfire' to check operation status.
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf services
Getting services in org pds / space dsmicroservices as admin...
OK

name              service      plan                  bound apps   last operation   
dsmicro_gemfire   p-gemfire    GemFireServicePlan1                create succeeded   
dsmicro_rabbit    p-rabbitmq   standard                           create succeeded   
dsmicro_redis     p-redis      shared-vm                          create succeeded   
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|

Install GemFire CLI plugin for PCF

Download GemFire CF CLI plugin : https://docs.pivotal.io/gemfire-cf/gfe-cli.html

sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  chmod +x ~/Downloads/cf-gemfire-cli-darwin-amd64-1.6
chmod: /Users/sramanujam/Downloads/cf-gemfire-cli-darwin-amd64-1.6: No such file or directory
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  chmod +x ~/Downloads/cf-gemfire-cli-darwin-amd64-1.6.0 
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf install-plugin ~/Downloads/cf-gemfire-cli-darwin-amd64-1.6.0 

**Attention: Plugins are binaries written by potentially untrusted authors. Install and use plugins at your own risk.**

Do you want to install the plugin /Users/sramanujam/Downloads/cf-gemfire-cli-darwin-amd64-1.6.0? (y or n)> y

Installing plugin /Users/sramanujam/Downloads/cf-gemfire-cli-darwin-amd64-1.6.0...
OK
Plugin GemFire v1.7.0 successfully installed.
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf plugins
Listing Installed Plugins...
OK

Plugin Name   Version   Command Name          Command Help   
GemFire       1.7.0     gemfire               GemFire plugin command's help text   
GemFire       1.7.0     show-gfsh             Show gfsh connect command   
GemFire       1.7.0     restart-gemfire       Restart GemFire cache servers (Also used for applying configuration changes)   
GemFire       1.7.0     export-gemfire        Retrieve GemFire artifacts, such as logs and stats   
GemFire       1.7.0     show-wan-config-url   Display the WAN configuration URL for the service instance   
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf gemfire
NAME:
   cf gemfire - Cloud Foundry plugin to interact with GemFire service instances

USAGE:
   cf [global options] command [command options] [arguments...]

VERSION:
   1.7.0.0


COMMANDS:
   show-gfsh        SERVICE_INSTANCE
   restart-gemfire    SERVICE_INSTANCE --cluster-config CONFIG.ZIP | -c CONFIG.ZIP
   export-gemfire    SERVICE_INSTANCE
   show-wan-config-url    SERVICE_INSTANCE
   help, h        Shows a list of commands or help for one command


GLOBAL OPTIONS:
   --help, -h        show help
   --version, -v    print the version

sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒

Restart GemFire, get connection information etc.

sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf restart-gemfire dsmicro_gemfire
This command could take a while. Timeout set to: 900 seconds.
..................
Cluster successfully restarted
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒  cf show-gfsh dsmicro_gemfire
Use the following command in gfsh to connect to service instance dsmicro_gemfire
    connect --use-http --url=https://gf-plan-1-dashboard-XXXXX.run.haas-23.pez.pivotal.io/gemfire/v1
For a non HTTPS connection (not recommended -- use with caution!) use the following command:
    connect --use-http --url=http://gf-plan-1-dashboard-XXXXX.run.haas-23.pez.pivotal.io/gemfire/v1
sramanujam@srivatsan-ramanujams-macbook-pro:~/Downloads|
⇒

Spring Cloud Data Flow

Spring Cloud Dataflow (SCDF) is a framework for creating composable data microservices. This makes is easy to create data ingestion pipelines, real-time analytics etc. The framework has the following components:

  • SCDF shell

  • SCDF server (Dataflow Server)

  • A target runtime such as Cloud Foundry or YARN

  • Spring Boot Applications which run as data sources, processors or sinks within the target runtime

  • A binder (RabbitMQ/Kafka) for message passing between these spring boot applications.

From the SCDF shell, one can connect to a SCDF server to register spring boot applications as sources, processors or sinks using the app register command. In the figure above, http is a spring boot application acting as a data source and cassandra is another spring boot application acting as a data sink. Then, one can create a SCDF stream consisting of sources, processors and sinks (e.g http | cassandra) from the SCDF shell which gets submitted to the SCDF server. The SCDF server running in the target runtime is responsible for spinning up the spring boot applications and ensuring that the messaging pipeline is setup between these applications. The messaging pipeline could be RabbitMQ or Kafka.

In this project, we wanted to create a SCDF pipeline for the following two analytic tasks:

  • Sentiment Scoring of Tweets
  • Recommender for Tweets

To start-off, you’ll first need to deploy the SCDF server into your PCF environment, bring up the SCDF shell, register your modules and create the appropriate streams. Sample instructions for these are below.

Download SCDF binaries

You can download the SCDF server and the shell from here.

Deploy SCDF server and bind Rabbit, Redis

    cf push dsm-dataflow-server --no-start -p spring-cloud-dataflow-server-cloudfoundry-1.0.0.M4.jar
    cf bind-service dsm-dataflow-server dsm-redis
    cf bind-service dsm-dataflow-server dsm-rabbit
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_URL https://api.run.haas-23.pez.pivotal.io
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_ORG "pds"
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SPACE dsm
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_DOMAIN cfapps.haas-23.pez.pivotal.io
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SERVICES dsm-redis,dsm-rabbit
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_USERNAME sramanujam@pivotal.io
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_PASSWORD "*******"
    cf set-env dsm-dataflow-server SPRING_CLOUD_DEPLOYER_CLOUDFOUNDRY_SKIP_SSL_VALIDATION true
    cf start dsm-dataflow-server

Start the SCDF shell and connect to the SCDF server

    java -jar spring-cloud-dataflow-shell-1.0.0.RELEASE.jar
    dataflow config server http://dsm-dataflow-server.cfapps.haas-23.pez.pivotal.io/

Sample code for registering apps (modules)

    app register --name grtngsource --type source  --uri http://dsm-pez.cfapps.haas-23.pez.pivotal.io/img/GreetingSource-0.0.1-SNAPSHOT-rabbit.jar
    app register --name logsink  --type sink  --uri http://dsm-pez.cfapps.haas-23.pez.pivotal.io/img/LoggingSink-0.0.1-SNAPSHOT-rabbit.jar

Sample code for creating a stream

    stream create --name greettest --definition "grtngsource | logsink" --deploy
view rawsetting_up_scdf.md hosted with ❤ by GitHub

In the following sections, we will describe the custom SCDF components that we developed to create these two pipelines.

Sentiment Scoring Of Tweets

We performed the following steps to assign a sentiment score for each of the tweets:

  • Tokenization and part-of-speech tagging of tweets using gp-ark-tweet-nlp - a PL/Java wrapper on Ark-Tweet-NLP from CMU, that enables us to perform part-of-speech tagging at scale on our MPP databases.

  • Extract two word-phrases which match a set of certain patterns based on a modification of Turney’s algorithm.

  • Compute sentiment score for the extracted phrases based on mutual information.

  • Finally, each tweet gets a score based on the phrases that are present in that tweet. This method of computing sentiment score is completely unsupervised and can be bootstrapped from a dictionary of positive and negative adjectives.

Figure above illustrates all the data microservices that we developed within SCDF and GemFire to deploy a pipeline that can compute sentiment scores in real-time on incoming tweets.

  • Twitter Source: This is a built-in SCDF component that connects to a twitter account and can act as a source of tweets. This component takes as input twitter credentials and twitter stream properties. More details can be found here.

  • Tweet Tokenization: This SCDF component takes in a tweet and performs the following transformations:

    1. Tokenization
    2. Part of speech tagging using ArkTweetNLP
    3. Extraction of phrases according to modified Turney algorithm for sentiment scoring 

    For instance, if the tweet is “hello beautiful fantastic happy world”, it would be tokenized and tagged as hello(!) beautiful(A) fantastic(A) happy(A) world(N). Then, according to Turney’s algorithm we would search through all possible trigrams and choose a subset that is in accordance with the rules of Turney’s algorithm. In this example, it would extract the phrase “beautiful fantastic happy”.

  • Sentiment Compute Processor: This SCDF component takes in the matching phrases present in each tweet and sends a REST API request to GemFire to retrieve the polarity of those phrases. Polarity of phrases is cached in GemFire which is a result of analyzing twitter training data in Pivotal’s GPDB. Finally, average polarity of all the matching phrases in a tweet is reported as the sentiment score for the incoming tweet.

The snippet to accomplish this is shown below:

  float Senti_score = (float)0.0;
  for(int i=0; i < mp_obj.matching_phrases.size()-1;i++)
  {
  String mp_polarity = restTemplate.getForObject(gemfire_phrase_polarity_uri+mp_obj.matching_phrases.get(i),
  String.class);
  parseJSON_forscore(mp_polarity);
  Senti_score = Senti_score + Float.valueOf(ret_senti_score);
  }
  if(mp_obj.matching_phrases.size() > 1)
  {
  Senti_score = Senti_score/(mp_obj.matching_phrases.size()-1);
  }
  result = message + "Sentiment score:"+Senti_score;
  return result;
  • REST API: This SCDF component is a simple processor that takes the incoming message and exposes it at a REST endpoint. The snippet to accomplish this is shown below:
  @RestController
  @EnableBinding(Processor.class)
  public class RestApiEndPoint {
  public String message="hello";
   
  @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public String log(String message) {
  this.message=message;
  System.out.println(message);
  return message;
  }
   
  @RequestMapping("/")
  public String index() {
  System.out.println(this.message);
  return this.message;
  }
  }
  • Logging Sink: This SCDF component is a simple sink end-point for debugging purposes. The complete snippet for this component is given below:
  import org.springframework.cloud.stream.annotation.EnableBinding;
  import org.springframework.cloud.stream.annotation.StreamListener;
  import org.springframework.cloud.stream.messaging.Sink;
   
  @EnableBinding(Sink.class)
   
  public class LoggingSink {
   
  @StreamListener(Sink.INPUT)
  public void log(String message) {
  System.out.println(message);
  }
  }
view rawlogging_sink.java hosted with ❤ by GitHub

Note that within this implementation of LoggingSink there is no explicit mention of the binder (RabbitMQ/Kafka). This also holds true for other SCDF components. The choice of binder is specified in pom.xml file while building this spring boot application. The implementation of the application is independent of the messaging pipeline and you don’t have to change a line of code to build the same spring boot application from Kafka to RabbitMQ.

Recommender for Tweets

All of the components are the same as the components in the sentiment scoring pipeline except for the Tweet Recommender processor. This SCDF component takes individual tokens from the tweet. For instance, if the tokens are {W1,W2…Wn} then for each Wi, we send a REST API request to GemFire for the corresponding wordvector Vi. Then, all the Vi’s are averaged to obtain a vector representation for the tweet (Vavg). Finally, we make another GemFire REST API call for finding K-Nearest neighbors to retrieve similar tweets to the incoming tweets.

GemFire

GemFire is a distributed in-memory data grid that provides low latency data access, in-memory computing along with other distributed data management capabilities. A GemFire cluster (a distributed system) typically consists of three key components:

  • Locators keep track of cluster members, and provide load-balancing services.

  • Cache servers (or nodes) are mainly used for hosting data and executing GemFire processes. User-defined Java functions can be registered on cache servers (known as server-side functions), and executed in parallel on distributed servers hosting relevant data. Such a data-aware function execution mechanism enables fast parallel processing.

  • Regions are analogous to tables in a relational database. They store data as key-value pairs. A region can be replicated or partitioned across multiple cache servers, and can also be persisted to disk.

GemFire provides a command-line utility, gfsh, for cluster configuration and management. Alternatively, configuration can be done usingcache.xml and gemfire.properties files; for example you can create regions in cache.xml. Cache servers in a GemFire cluster can share the same cache.xml, or have their individual cache configurations.

In this project we used GemFire as the real-time tier to surface analytics results produced on GPDB to Spring Boot applications. To that end, we created result publishing microservice on GemFire along with a Jupyter “publish” magic command to execute the microservice from a Jupyter notebook. We also implemented GemFire microservices as server-side functions for lightweight in-memory analytics. Through GemFire REST APIs Spring Boot applications can query cached analytics results, and invoke server-side functions without the need of pulling data over to the application side for processing.

GemFire-GPDB Connector

The GemFire-GPDB connector is a GemFire extension that supports parallel import and export of data between GemFire regions and GPDB tables; seethis presentation for a quick introduction. Using the GemFire-GPDB connector we built a result publishing microservice that publishes analytics results produced on GPDB (including phrase polarity/sentiment scores, vector representations of words and tweets) to GemFire to support streaming applications.

The connector transfers data directly between GPDB segments and GemFire cache servers in parallel via GPDB’s external table interface, while the control of the data movement is communicated from a GemFire server to GPDB master node via JDBC connection. Each GPDB table is mapped to a GemFire region. The mapping and the specification of the JDBC connection are given as part of cache configuration in cache.xml. The example configuration below maps the “sentiment_score” table to the “SentimentScore” region. An empty region named “SentimentScore” will be created upon the start of the GemFire cluster. The JDBC connection is bound to a JNDI data sourcebacked by GPDB.

  ...
  <jndi-bindings>
  <jndi-binding jndi-name="datasource" type="SimpleDataSource"
  jdbc-driver-class="org.postgresql.Driver" user-name="dbuser"
  password="dbpass" connection-url="jdbc:postgresql://10.64.20.1:5433/gemfire">
  </jndi-binding>
  </jndi-bindings>
  <region name="SentimentScore">
  <region-attributes refid="PARTITION">
  <partition-attributes redundant-copies="1" />
  </region-attributes>
  <gpdb:store datasource="datasource">
  <gpdb:synchronize mode="manual" persistent="false" />
  <gpdb:types>
  <gpdb:pdx name="io.pivotal.ds.gemfire.gpdb.entity.SentimentScore"
  table="sentiment_score">
  <gpdb:id>
  <gpdb:field ref="phrase" />
  </gpdb:id>
  <gpdb:fields>
  <gpdb:field name="phrase" column="phrase" />
  <gpdb:field name="score" column="score" />
  </gpdb:fields>
  </gpdb:pdx>
  </gpdb:types>
  </gpdb:store>
  </region>
  ...

Having configured the GemFire cluster, loading data from GPDB to GemFire can simply be implemented as a GemFire server-side function as shown below:

  public class LoadFromGPDB extends RegionFunctionAdapter implements Declarable {
   
  private static final Logger log = LogManager.getLogger();
  public static final String ID = "LoadFromGPDB";
   
  @Override
  public boolean isHA() {
  return false;
  }
   
  @Override
  public String getId() {
  return ID;
  }
   
  @Override
  public void init(Properties props) {
  }
   
  /**
  * Load data from GPDB to a user-specified GemFire region.
  */
  @Override
  public void execute(RegionFunctionContext arg0) {
   
  try {
  Cache cache = CacheFactory.getAnyInstance();
  GpdbService gpdb = GpdbService.getInstance(cache);
   
  Region<Object, ?> region = arg0.getDataSet();
   
  // Clear the region before loading the table from GPDB.
  clearRegion(region);
   
  // Load the table from GPDB.
  long count = gpdb.importRegion(region);
  log.info("Loaded " + count + " rows from GPDB on region " + region.getName());
   
  arg0.getResultSender().lastResult(ID + ": Loaded " + count + " entities on region " + region.getName());
   
  } catch (Exception e) {
  e.printStackTrace();
  log.error(e.getMessage());
  }
  }
   
  /**
  * Clear a region.
  *
  * @param region
  * the region to be cleared.
  */
  public void clearRegion(Region<Object, ?> region) {
   
  Set<Object> keys;
   
  if (PartitionRegionHelper.isPartitionedRegion(region)) {
  keys = PartitionRegionHelper.getLocalPrimaryData(region).keySet();
  } else {
  keys = region.keySet();
  }
   
  for (Object key : keys) {
  region.remove(key);
  }
   
  }
   
  }
view rawLoadFromGPDB.java hosted with ❤ by GitHub

The “execute” method performs the work of the microservice. When executed on a specific region, it clears the region before loading the corresponding GPDB table. The function can be invoked from the gfsh shell:

gfsh>execute function --id=LoadFromGPDB --region=/SentimentScore

or via GemFire REST interface, e.g.:

curl -i -X POST http://localhost:15000/gemfire-api/v1/functions/LoadFromGPDB?onRegion=SentimentScore

Once data is loaded to a GemFire region, it can be conveniently queried through REST API:

jyu@:Server$ curl -i -X GET http://localhost:15000/gemfire-api/v1/SentimentScore/inconveniently%2Csmall%20part
HTTP/1.1 200 OK
Date: Mon, 25 Jul 2016 06:00:41 GMT
Content-Location: http://localhost:15000/gemfire-api/v1/SentimentScore/small%20part,inconveniently
Content-Type: application/json
Content-Length: 148
Server: Jetty(9.2.13.v20150730)

{
  "SentimentScore" : [ {
    "phrase" : "small part",
    "score" : "0.053"
  }, {
    "phrase" : "inconveniently",
    "score" : "-1.541"
  } ]
}
view rawquery-gemfire.md hosted with ❤ by GitHub

Jupyter Magic

From a data scientist perspective, it is desirable to be able to run the result publishing microservice in the same development environment as the development of analytics models. GemFire REST APIs make this possible. The following code creates a Jupyter magic command for loading a GPDB table to GemFire from within a Jupyter notebook.The “publish” command simply sends a REST API request to GemFire to load the “sentiment_score” table to its corresponding GemFire region. Check out this GitHub project for the complete implementation.

In-Memory Analytics

In this project we also exploited GemFire’s in-memory computing capability for real-time analytics microservices. The bulk of the analytics work is done on GPDB in an offline fashion. GemFire is primarily used as an in-memory data store to surface the results from GPDB. But in the scenario where real-time analytics involving the use of streaming data is desired, it makes more sense to push analytics to the memory tier as well. This is the case for the tweet recommender as introduced earlier. For each incoming tweet, it recommends similar historical tweets in real-time. This can be formulated as a K nearest neighbor (KNN) search problem. We implemented the KNN search as a GemFire server-side function. Given a tweet, the function will return K similar tweets that are cached in a partitioned GemFire region.

A GemFire function can be executed on a particular region or on a particular set of servers. For a region-dependent function GemFire automatically determines which servers host the data, and only distributes the work to the relevant servers for parallel execution. The flexibility of deciding where to run a function makes it easy to implement the KNN search in a MapReduce fashion. In the Map phase a region-dependent KNN search is carried out across multiple servers, finding local KNNs in parallel from each partition of the region storing the tweets. In the Reduce phase the local KNN results are collected to a single server, and aggregated to produce KNNs with respect to the entire dataset. The tweet recommender processor only need to interact with the aggregate function (via REST API) to obtain recommended tweets.

The following snippet details the implementation of local KNN search. The key is to apply KNN search to local data only, using the “PartitionRegionHelper.getLocalDataForContext” method.

  public class KNNParallel extends FunctionAdapter implements Declarable {
   
  public static final String ID = "KNNParallel";
   
  @Override
  public boolean isHA() {
  return false;
  }
   
  @Override
  public String getId() {
  return ID;
  }
   
  @Override
  public void init(Properties arg0) {
   
  }
   
  /**
  * Find K nearest neighbors of a given query vector based on Euclidean
  * distance. If data is partitioned over several servers, find 'local' KNNs
  * in parallel on each partition of the data. One can obtain KNNs with
  * respect to the entire dataset by simple post-processing of local KNN
  * results, cf. KNN.java.
  */
  @Override
  public void execute(FunctionContext arg0) {
   
  Object[] arguments = (Object[]) arg0.getArguments();
  assert arguments.length == 4 : "4 arguments are expected: K, idField, valueField, queryVector";
  int K = Integer.parseInt(arguments[0].toString().trim());
  String idField = arguments[1].toString().trim();
  String valueField = arguments[2].toString().trim();
  String queryVec = arguments[3].toString().trim();
   
  // KNN entities ordered by descending distance to the query point.
  PriorityQueue<PdxInstance> knn = new PriorityQueue<PdxInstance>(1, distComparator);
   
  Cache cache = CacheFactory.getAnyInstance();
  Region<Object, PdxInstance> region = ((RegionFunctionContext)arg0).getDataSet();
   
  // For a partitioned region, work on local data only.
  if (PartitionRegionHelper.isPartitionedRegion(region)) {
  region = PartitionRegionHelper.getLocalDataForContext((RegionFunctionContext)arg0);
  }
   
  if (region.size() > 0) {
  Iterator<PdxInstance> itr = region.values().iterator();
  while (itr.hasNext()) {
  PdxInstance data = itr.next();
  double dist = euclidean(queryVec, ((String) data.getField(valueField)).trim());
  // Add a new serialized KNN entity to the queue.
  PdxInstance knnEntity = KNNEntityToPdxInstance(cache, idField, data.getField(idField).toString(),
  valueField, ((String) data.getField(valueField)).trim(), dist);
  knn.offer(knnEntity);
  // If more than K entities in the queue, remove the one
  // with the maximum distance to the query point.
  if (knn.size() > K) {
  knn.poll();
  }
  }
   
  }
   
  // Return K serialized KNN entities. Serialization and conversion of
  // PriorityQueue to ArrayList is to enable JSON
  // conversion, so that the function can be invoked through REST API.
  arg0.getResultSender().lastResult(new ArrayList<>(knn));
  }
   
  /**
  * Euclidean distance.
  *
  * @param vec1
  * Space separated numerical string.
  * @param vec2
  * Space separated numerical string.
  * @return Euclidean distance.
  */
  public double euclidean(String vec1, String vec2) {
  String[] v1 = vec1.split("\\s+");
  String[] v2 = vec2.split("\\s+");
  assert v1.length == v2.length : "euclidean(): Input vectors have different lengths.";
  double res = 0;
   
  for (int i = 0; i < v1.length; i++) {
  res += Math.pow((Double.parseDouble(v1[i]) - Double.parseDouble(v2[i])), 2);
   
  }
  return Math.sqrt(res);
  }
   
  /**
  * Comparator for serialized KNN entity comparison based on the distance to
  * the query point.
  */
  public static Comparator<PdxInstance> distComparator = new Comparator<PdxInstance>() {
   
  @Override
  public int compare(PdxInstance a, PdxInstance b) {
  double diff = (double) a.getField("dist") - (double) b.getField("dist");
   
  if(diff > 0) {
  return -1;
  } else if (diff < 0) {
  return 1;
  } else {
  return 0;
  }
  }
  };
   
  /**
  * Create a PdxInstance for a KNN entity.
  *
  * @param gemfireCache
  * GemFire cache.
  * @param idField
  * Name of the ID field of a KNN entity.
  * @param id
  * ID of a KNN entity.
  * @param valueField
  * Name of the value field of a KNN entity.
  * @param value
  * Vector representation of a KNN entity with the vector
  * represented by a space separated string.
  * @param dist
  * Distance to the query point.
  * @return PdxInstance of a KNN entity.
  */
  protected PdxInstance KNNEntityToPdxInstance(Cache gemfireCache, String idField, String id, String valueField,
  String value, Double dist) {
  PdxInstanceFactory pdxInstanceFactory = gemfireCache
  .createPdxInstanceFactory("io.pivotal.ds.gemfire.entity.KNNEntity");
  pdxInstanceFactory.writeString(idField, id);
  pdxInstanceFactory.writeString(valueField, value);
  pdxInstanceFactory.writeDouble("dist", dist);
  return pdxInstanceFactory.create();
  }
  }
view rawKNNParallel.java hosted with ❤ by GitHub

The local KNN search is initiated by the aggregate function. The use of the “FunctionService.onRegion” method as shown in the code below instructs GemFire to run the KNN search locally only on concerned servers. The local results are collected through “ResultCollector” for final processing.

  public class KNN extends FunctionAdapter implements Declarable {
   
  public static final String ID = "KNN";
   
  @Override
  public boolean isHA() {
  return false;
  }
   
  @Override
  public String getId() {
  return ID;
  }
   
  @Override
  public void init(Properties arg0) {
   
  }
   
  /**
  * Find K nearest neighbors of a given query vector based on Euclidean
  * distance.
  */
  @Override
  public void execute(FunctionContext arg0) {
  Object[] arguments = (Object[]) arg0.getArguments();
  assert arguments.length == 5 : "5 arguments are expected: regionName, K, idField, valueField, queryVector";
   
  String regionName = arguments[0].toString().trim();
  int K = Integer.parseInt(arguments[1].toString().trim());
   
  Cache cache = CacheFactory.getAnyInstance();
  Region<Object, PdxInstance> region = cache.getRegion(regionName);
   
  Execution exe = FunctionService.onRegion(region).withArgs(Arrays.copyOfRange(arguments, 1, 5));
  // Find 'local' KNNs on each partition of the data in parallel. If data
  // is not partitioned, return KNNs with respect to the entire dataset.
  ResultCollector<?, ?> resultCollector = exe.execute("KNNParallel");
   
  @SuppressWarnings("unchecked")
  ArrayList<ArrayList<PdxInstance>> result = (ArrayList<ArrayList<PdxInstance>>) resultCollector.getResult();
   
  // Consolidate local KNN results.
  PriorityQueue<PdxInstance> knn = new PriorityQueue<PdxInstance>(1, distComparator);
  for (int i = 0; i < result.size(); i++) {
  for (int j = 0; j < result.get(i).size(); j++) {
  knn.offer(result.get(i).get(j));
  // If more than K entities in the queue, remove the one
  // with the maximum distance to the query point.
  if (knn.size() > K) {
  knn.poll();
  }
  }
  }
   
  arg0.getResultSender().lastResult(new ArrayList<>(knn));
   
  }
   
  /**
  * Comparator for serialized KNN entity comparison based on the distance to
  * the query point.
  */
  public static Comparator<PdxInstance> distComparator = new Comparator<PdxInstance>() {
   
  @Override
  public int compare(PdxInstance a, PdxInstance b) {
  double diff = (double) a.getField("dist") - (double) b.getField("dist");
   
  if(diff > 0) {
  return -1;
  } else if (diff < 0) {
  return 1;
  } else {
  return 0;
  }
  }
  };
  }
view rawKNN.java hosted with ❤ by GitHub

The aggregate function is expected to run on a single server (can be any server in the cluster). The server essentially acts as a master node for the MapReduce process. The following is an example REST API call to the KNN function:

jyu@:Server$ curl -H "Content-Type: application/json" -X POST -d '[{"@type":string,"@value":"WordVec"},{"@type":string,"@value":"2"},{"@type":string,"@value":"word"},{"@type":string,"@value":"wordvec"},{"@type":string,"@value":"3.5 4"}]' http://localhost:15000/gemfire-api/v1/functions/KNN?onMembers=server0
[ [ {
  "word" : "2",
  "wordvec" : "2 2",
  "dist" : 2.5
}, {
  "word" : "3",
  "wordvec" : "3 6",
  "dist" : 2.0615528128088303
} ] ]
view rawrest-call-knn.md hosted with ❤ by GitHub

Deploying GemFire Microservices on PCF

Deploying GemFire microservices on PCF involves uploading a configuration zip file to the GemFire cluster on PCF. The zip file should contain microservice implementation jars, any dependency jars, cache.xml, and a cluster configuration file if needed. The “cf restart-gemfire” command can start the GemFire cluster with the uploaded configuration as below:

jyu@:config$ cf restart-gemfire dsmicro-gemfire --cluster-config dsmicroGemFireConfig.zip --rest-api=enable
This command could take a while. Timeout set to: 900 seconds.
....................URL for GemFire's REST API is: gemfire-dsmicro-gemfire.run.haas-23.pez.pivotal.io/gemfire-api/v1

Cluster successfully restarted
view rawrestart-gemfire.md hosted with ❤ by GitHub

Note that GemFire REST API is enabled, which provides immediate access to all the implemented microservices (server-side functions). The following example REST API call executes in-memory KNN search on PCF:

curl -H "Content-Type: application/json" -X POST -d '[{"@type":string,"@value":"SentimentScore"},{"@type":string,"@value":"2"},{"@type":string,"@value":"phrase"},{"@type":string,"@value":"score"},{"@type":string,"@value":"1.5"}]' http://gemfire-dsmicro-gemfire.run.haas-23.pez.pivotal.io/gemfire-api/v1/functions/KNN?onMembers=cacheserver0
view rawknn-on-pcf.md hosted with ❤ by GitHub

Summary

In this project, we have explored and implemented a set of three data microservices that would help a data scientist’s workflow in deploying a model into production.

  • The first data microservice is to help a data scientist push a trained model from an MPP environment (Greenplum DB or HDB) to an in memory data grid (GemFire) where model scoring happens on incoming real-time data.

  • The second set of data microservices is to help a data scientist set-up a data pipeline using Spring Cloud Dataflow components. This set of SCDF components acting as data sources, processors and sinks can help the data scientist transform the data sources in a way needed for model scoring. Furthermore, this kind of data pipeline can be used by data scientists to gather data from production workflows for developing models.

  • The third set of data microservices is to score incoming feature vectors in real-time and is implemented in an in-memory data grid (GemFire) for scalability. If the model requires caching of features and multiple data sources for feature extraction, then the feature extraction also needs to happen here before scoring.

Enter content here

Previous
An Introduction to Look-Aside Caching
An Introduction to Look-Aside Caching

Learn the basics of look-aside caching, how it works, when to use it and how it differs from inline caching...

Next Video
New Security in Apache Geode
New Security in Apache Geode

Hear about the new security framework released in Apache Geode 1.0 M3. This framework provides a simplified...