Zero Downtime Data Migration Between GemFire Clusters Using Apache Kafka and Spring Cloud Stream

November 16, 2017

Authored by Partho Bardhan and Niranjan Sarvi.  

With the increase in online security breaches in recent years, it has become an imperative to protect data from theft. Hence, enterprises are investing heavily in security.

As part of their strategic goals in the year 2017, one Pivotal customer wanted to secure their GemFire clusters with SSL. This required a migration from the existing, unsecured GemFire cluster to a new, SSL-secured GemFire cluster. The customer had several critical customer-facing applications running 24/7 against the existing GemFire cluster, making thousands of updates every second. Hence, it was vital that the applications and both the old and new clusters had no downtime during the security upgrade.

This blog post describes how Pivotal accomplished the no down-time data transfer through data export-import and a publish-subscribe mechanism to continuously transfer changes between the clusters, using Apache Kafka and Spring Cloud Stream applications.

 

Problem

Until recently, upgrading an existing, unsecured GemFire cluster with SSL required a full restart of the cluster. This causes downtime to the cluster, negatively impacting many customer-facing, mission critical applications that rely on the cluster for data storage and processing. Also, traditionally used components for data migration between GemFire clusters like WAN gateway replication and client continuous query/interest-based replication cannot transmit data between an unsecured and SSL-secured cluster. And a rolling restart by adding SSL to each node results in a mix of SSL-secured and unsecured components that would not be able to communicate, making this approach a non-starter.

 

Solution

Pivotal developed an architecture that enables bi-directional flow of data between an existing, unsecured GemFire cluster and a new, SSL-secured GemFire cluster. The two clusters maintain data consistency while the applications are gradually migrated from the old to the new, secure cluster.

 

Key components

Some key components in the architecture are:

Cache Listener

A cache listener is a server side utility in GemFire that tracks changes to regions and data entries through event objects. The listener is extended to capture CRUD events resulting from operations on the unsecured GemFire cluster, extract the data from an event object, convert the data to a JSON message, and send the message to Apache Kafka.

Apache Kafka Topics

Apache Kafka (for more details, read here) is publish-subscribe messaging middleware that stores the incoming messages from the cache listeners in logical containers called topics.

Spring Cloud Stream Applications

Spring Cloud Stream (more details here) is a framework for building message-driven microservices. It is built on Spring Boot for creating microservices applications and Spring Integration for connecting to messaging middleware like Kafka. These externally configurable Spring Cloud Stream applications consume messages from Kafka topics, convert them to objects, and  write them to the secured GemFire cluster.

The solution includes an initial export of data from the existing, unsecured GemFire cluster, then import to the new, secure GemFire cluster. While the initial data transfer is in progress, cache listeners running on the cluster regions convert all events to messages and publish them to Kafka topics. After the data is imported successfully to the new GemFire cluster, the Spring Cloud Stream applications begin to consume the messages that are queued up on Kafka. The architecture is kept running to ensure messages flow continuously between clusters. Once the data is consistent between the clusters, the applications are gradually migrated over to the SSL-secured GemFire cluster. Finally, the unsecured cluster is retired.

 

Implementation

  1. Initial setup

The customer’s production cluster was running GemFire 8.2.0. A new cluster was setup with GemFire 8.2.5, with SSL enabled. The new cluster included similar region definitions to the existing cluster, but with no data or clients. For easy reference, we will refer to the live, unsecured cluster as oldCLuster and the new, secured cluster as newCluster. A Kafka cluster was also created and secured with SSL.

  1. Create topics on Kafka to store messages from oldCluster to newCluster

A Kafka topic was created for each region in the oldCluster. They were named following the format <regionName>-<source-cluster>-to-<destination-cluster>. For example, the topic created on Kafka for a region named Region1 was region1-oldCluster-to-newCluster, which indicates that the topic stored messages published from the region1 region on oldCluster are intended to be consumed and sent to newCluster.

  1. Create a region to hold the timestamp of operations

This architecture required both oldCluster and newCluster to be up and running simultaneously while data and applications are migrated in order that multiple operations could occur against the same key on both clusters. This required a way to ensure only the latest operations were consistently maintained on both clusters while rejecting operations that had older timestamps. To achieve this, a region called latestTimestamp was created on both the clusters. The latestTimestamp region stored a combination of the entryKey and regionName as its key and the timestamp (in milliseconds) of the operation as value. Operations to an existing key in the latestTimestamp region would update its value to the new operation timestamp.

LatestTimestamp Region
Key Value
{entryKey: C23456, regionName: customer} 1504639250
{entryKey: C23457, regionName: customer} 1504639257
{entryKey: ADDR-C23456, regionName: address} 1504648391

 

  1. Deploy GemFire cache-listeners on both clusters

A cache listener was deployed to every region in both the oldCluster and newCluster to capture all operations that occurred.

The cache listener had the following functions:

  1. Capture create, update, delete and expiry events.

  2. From the event object, extract the following and create a JSON string with them:

    • operation

    • regionName

    • entryKey

    • value

    • timestamp

  3. Log the timestamp to the latestTimestamp region using a combination of entryKey and regionName for the key.

  4. Send the JSON string as a message to the Kafka topic.

The cache.xml file in each node was updated to add the cache-listener and a rolling restart was performed to ensure the oldCluster had no downtime. As soon as a node was restarted, any operations happening on the entries on its regions were transformed into messages and published to the corresponding Kafka topic <region>-oldCluster-to-newCluster.

The operation is demonstrated in the diagram below.

  1. The initial data migration

GemFire offers optimized data export and import operations. The data in the regions on the oldCluster were exported to files on disk using standard GemFire export import commands. These files were then imported into newCluster.

During this time, all operations on oldCluster were transformed to JSON messages and published to their respective Kafka topics.

  1. Start subscribers to consume messages from Kafka

After the initial data was migrated, the operations that occurred during the migration on oldCluster had to be applied to the newCluster. These changes were retained on Kafka topics. Spring Cloud Stream apps were deployed to consume these messages.

An application with the following functions was deployed for each topic:

  1. Consume each message on Kafka topic.

  2. Using the entryKey and regionName from the message, check if there exists a timestamp value in the latestTimestamp region on newCluster. A more recent value means the message coming from oldCluster is older and a more recent operation has happened on the newCluster. In that case simply reject the message and proceed to the next message. Otherwise, continue processing the message.

  3. Extract the value object from the message.

  4. Put the value object into the relevant GemFire region on the newCluster.

  5. Update the latestTimestamp region on newCluster using the entryKey and regionName from the message.

The following diagram demonstrates the flow described above:

 

The initial export-import and the above architecture handled a continuous flow of events to ensure that data was consistent between the oldCluster and the newCluster.

The next step was to gradually move applications over to the new cluster. In order for the applications to simultaneously run against both clusters in production required a reverse flow of messages from newCluster to oldCluster, so that both clusters would be consistent.

  1. Create topics on Kafka to store messages from newCluster to oldCluster

These topics stored messages published by cache listeners running on regions on the newCluster. They followed the same naming standards as before. So, for region named region1 the topic created was region1-newCluster-to-oldCluster.

  1. Start subscribers to consume messages from newCluster to oldCluster on Kafka topics

With data already in newCluster, the cache-listeners started publishing messages to their respective Kafka topics. A second set of Spring Cloud Stream applications were deployed to consume these messages and perform the operations on the oldCluster.

  1. Migrate applications to newCluster

When the data in oldCluster and newCluster became consistent, the applications were gradually migrated over. During the gradual migration, applications would be running against both clusters. The bi-directional flow is represented by the blue and dotted-red arrows in the diagram below. Finally, all applications were moved over to the newCluster.

  1. Prevent an infinite loop of messages

GemFire cache-listeners are fired for all entry events on the cluster. In this architecture, any operation in a cluster are transmitted to the destination cluster and invokes the cache-listener running on the destination cluster. This sends a message to the source cluster, which had the operation in the first place, and so on, leading to an infinite loop of messages between the clusters.

A technique similar to reverse path forwarding, which is commonly used to ensuring loop-free forwarding of multicast packets in multicast routing, was used to prevent the infinite loop. During the put operation, the subscriber added an additional callback argument to identify the source of the message as ‘Kafka’. On the cache-listener, any incoming message that had the callback argument ‘Kafka’ was not processed. This meant that any operation on a cluster was sent as a message, but upon reaching the destination cluster the source was identified as ‘Kafka’ and no new message was generated for it.

  1. Remove cache listeners, stop subscribers and retire oldCluster

Once the applications were moved over to newCluster, the cache-listeners were dropped from both clusters. The following command run on gfsh, the command line utility for GemFire, drops the cache listener from region1:

alter region --name=/region1 --cache-listener

When all messages were consumed from Kafka, the subscribers were stopped. The oldCluster was retired.

 

Other applications

Many message-driven data movement use cases can be solved using a combination of Spring Cloud Stream and messaging middleware like Rabbit MQ or Kafka. Find an example of migrating data and applications between two relational databases using modern data data pipelines here.

Pivotal also offers a toolkit for building data integration and real-time data processing pipelines using Spring Cloud Data Flow. Apart from data movement, Spring Cloud Data Flow is suitable for a range of data processing use cases, from import/export to event streaming and predictive analytics.

Previous
How to Run a Really Good Retrospective
How to Run a Really Good Retrospective

Everyone should do retrospectives — not just agile software teams.Agile retrospectives. If they have a repu...

Next
Design-led Solutions for Humanitarian Aid
Design-led Solutions for Humanitarian Aid

How Pivotal worked with the Humanitarian Innovation Fund to develop a guide for applying lean and user-cent...