/

Engineering

Dec 6, 2024

Dec 6, 2024

Acceptable and fast user entity resolution using dataflow

Learn what entity resolution is and how to implement it effectively using dataflow. Check out insights from our developers and practical steps to help you improve data quality.

Entity resolution is a well-established and prevalent challenge in software engineering. Many organizations face the issue of having user-related data dispersed across various databases and systems. 

This fragmentation can lead to significant difficulties when analyzing and correlating user demographics and activities across different data sources, particularly in analytics. Entity resolution consolidates and groups information about the same users.


What is entity resolution?

Entity Resolution (ER) is the process of identifying and merging records that refer to the same real-world entities across different data sources. Generally speaking, it helps to create a unified and accurate view of data by linking data points in relational data, eliminating duplicates, and improving overall data quality.


What is the difference between record linkage and entity resolution?

Entity resolution is a form of graph traversal of scattered user information. 

For example, if a user has two login accounts with different emails but with the same home address these accounts are highly likely to be connected. In other words, these accounts (nodes) can be traversed through address relationships (nodes). 

In our situation, we have the same users having the same exact emails or IP addresses but different osId since osId can be recreated after reinstallation.  The purpose of this entity resolution is to traverse through these scattered OSIds by PIIs and connect them.


Why is entity resolution useful? 

In a startup, it is incredibly important for us to accurately track our users' behavior, helping us understand what is working and what isn’t when it comes to product releases, growth activities, and the general software lifecycle. When I joined Pieces, I encountered this issue firsthand. 

One of my initial projects involved accurately capturing user activity events. At that time, the existing approach for user activity analytics involved sending events to the analytics platform via Segment, followed by manual filtering on the analytics side. 

Each event was tagged with a UUID generated by our software, which was primarily used to track activity on a per-user basis.

However, this method presented several challenges:

Firstly, the event schema was not standardized, as Segment accepts virtually any JSON format. This resulted in multiple instances within the client codebase where Segment events were being sent with varying schemas. 

Secondly, when a user uninstalls and reinstalls our software, a new UUID is generated, causing events associated with the new UUID to be mistakenly counted as activity from a new user.

While the first issue could be addressed by standardizing the schema and client-side code, we still faced the challenge of analyzing events that had already been sent. This required developing a strategy to retrospectively resolve and consolidate user data to ensure accurate analytics.

We selected Google Cloud Dataflow to manage and process over three years of user activity event data. This dataset contains more than 200 million events, with the volume set to grow significantly in the future. 

Since our tech stack is heavily based on Google Cloud services, Dataflow was a natural fit, allowing us to build a highly scalable pipeline to aggregate and transform raw event data sourced from Mixpanel.

In the initial phase of the project, our primary goal was to identify each unique user by their universally unique identifier (UUID), which we refer to as "osId" (we’ll use this term going forward). 

Our user activity data resided exclusively on the analytics platform, so to enable Dataflow to perform efficient, scalable batch processing, we needed to transfer this data to Google Cloud Storage (GCS).

To achieve this, we created a scheduled job on Cloud Run, which retrieves data files from the analytics platform and stores them in GCS (We’ll cover the specifics of this technique in another blog post, but for now, assume that all raw JSON events are available in GCS.)

As mentioned, osIds could appear in various parts of the raw JSON data and are not marked with consistent property names. 

To accurately locate the correct osIds, we developed a set of assumptions:

  1. If the UUID appears multiple times across different events historically, it is more likely to be an os_id.

  2. If the UUID has appeared historically with the key or parent key as "os," it is also more likely to be an os_id.

Based on these assumptions, we created the following strategy to find the osId:

  1. Identify Potential UUIDs in Event JSONs:

  • For each event JSON, scan for all strings that match a UUID pattern using regex.

  • For each matching string, check if the "os" key appears either as a direct or parent key.

  • Construct an entity of PotentialUUID(uuid=..., isOsPath=...) for each matching UUID.

  1. Aggregate Historical UUID Data:

  • Aggregate all PotentialUUID entities across historical event JSONs.

  • Count the number of historical occurrences for each UUID and track whether it has ever appeared with an "os" key.

  • After aggregation, you will have a PotentialUUID(uuid=..., numberOfHistoricalOccurrences=..., everAppearedAsOs=...).

  1. Attach Most Likely os_id to Each Event:

  • For each user activity event, extract all regex-matching UUIDs.

  • Join these UUIDs with the aggregated PotentialUUID data.

  • Rank the PotentialUUID candidates by:

    • Whether they have everAppearedAsOs.

    • Their numberOfHistoricalOccurrences.

  • Attach the highest-ranked PotentialUUID as the os_id to the event.

We chose Scala along with Scio, a library that enables Apache Beam to run on Google Cloud Dataflow, to build our data processing pipeline. This combination is well-suited for easily processing large datasets and allows for efficient parallel computation.

The code snippet below shows how we implemented a custom aggregator to manage and aggregate potential osId data across various attributes. The PotentialOsIdAggregator class extends the Aggregator trait, defining the logic for preparing, merging, and presenting aggregated data.

class PotentialOsIdAggregator extends Aggregator[PotentialOsId, PotentialOsId, PotentialOsId] {

  // Defines a custom semigroup for merging two PotentialOsId instances
  implicit val osIdSemigroup: Semigroup[PotentialOsId] = (l: PotentialOsId, r: PotentialOsId) => (l + r)

  // Specifies how to prepare input data for aggregation
  override def prepare(input: PotentialOsId): PotentialOsId = input

  // Provides the defined semigroup for combining inputs
  override def semigroup: Semigroup[PotentialOsId] = osIdSemigroup

  // Finalizes and presents the aggregated result
  override def present(reduction: PotentialOsId): PotentialOsId = reduction
}

// Case class representing potential osId details with fields for emails, IP addresses, etc.
case class PotentialOsId(potentialOsId: String,
                         emails: Set[String],
                         ipAddresses: Set[String],
                         userNames: Set[String],
                         numOccurrences: Int = 1,
                         minTimestamp: Long = 0,
                         isOsUser: Boolean) {

  // Merges two PotentialOsId instances by combining fields
  def +(other: PotentialOsId): PotentialOsId = {
    PotentialOsId(
      potentialOsId = this.potentialOsId,
      emails = this.emails ++ other.emails,
      ipAddresses = this.ipAddresses ++ other.ipAddresses,
      userNames = this.userNames ++ other.userNames,
      numOccurrences = this.numOccurrences + other.numOccurrences,
      minTimestamp = Math.min(this.minTimestamp, other.minTimestamp),
      isOsUser = this.isOsUser || other.isOsUser
    )
  }
}

// Aggregates osId data by grouping by potentialOsId key
def aggregatePotentialOsIdStats(): SCollection[PotentialOsId] = potentialOsIds.keyBy(
  _.potentialOsId
).aggregateByKey(
  new PotentialOsIdAggregator
).values

Here’s a breakdown of each part:

  1. PotentialOsIdAggregator: This class aggregates PotentialOsId objects, leveraging a custom-defined Semigroup to specify how two PotentialOsId instances should be combined. This process includes merging attributes like emails, IP addresses, and usernames, along with maintaining a count of occurrences and a minimum timestamp.

  2. PotentialOsId Case Class: This case class models each potential osId and its associated metadata. The + method defines the logic for merging two instances of PotentialOsId, which the aggregator uses to incrementally build the combined results.

  3. aggregatePotentialOsIdStats Function: This function groups the potentialOsId data by unique osId values and then applies the custom aggregator to compute aggregate statistics. The result is an SCollection of combined PotentialOsId objects, which can be further processed or stored.

This setup is highly efficient, as Scio allows us to parallelize data processing in Dataflow, scaling seamlessly to accommodate our large dataset.

To give a clear view of our aggregation process, we also capture users' personally identifiable information (PII), such as emails and IP addresses, while aggregating PotentialOsId data. This information is essential for grouping related identifiers, helping us to build a cohesive view of user activities across events.

The diagram below visually represents the aggregation process, showcasing how raw event data is transformed as we consolidate various attributes into a single PotentialOsId record.

With our historical event data now containing accurate osId values and corresponding PotentialOsId objects that include associated personally identifiable information (PII), we’re ready to address the next challenge: users who receive a different osId upon reinstalling the app. 

To tackle this, we use the data we've gathered on UUIDs, usernames, and emails to establish mappings between these identifiers, creating a network of relationships that allows us to resolve user entities even when UUIDs differ.

This solution involves building a graph where:

  • Nodes represent UUIDs, usernames, and emails.

  • Edges connect nodes that share a relationship, like a UUID linked to an email or username based on historical data.

For instance, two UUIDs may be connected if they share the same email address or username, indicating they likely belong to the same user. By finding connected components in this graph, we can cluster related UUIDs and resolve users across different UUIDs.

Simplifying the graph traversal in dataflow

Typically, graph traversal is computationally expensive, especially if it requires exploring all possible node connections in an infinite loop. 

However, Google Cloud Dataflow requires a Directed Acyclic Graph (DAG) structure for processing, which disallows any looping or recursive operations. 

Instead of an exhaustive traversal, we made a practical assumption: most connected osId components share PII and can be linked within two or three degrees of connection. This limited traversal is sufficient to find most connected components in our data.


Steps for dataflow aggregation

To implement this solution in Dataflow, we followed these steps:

  1. Flat Mapping PotentialOsId and PII: For each osId, we create pairs with each PII element. For instance, if osId1 is associated with emails [email1@email.com, email2@email.com] and usernames [userName1, userName2], we generate pairs like:

    • (osId1, email1@email.com), (osId1, email2@email.com)

    • (osId1, userName1), (osId1, userName2)

  1. Aggregating by PII: Next, we aggregate all pairs by the PII element on the right side. This yields mappings such as:

    • (email1@email.com, [osId1, osId2…])

    • (userName1, [osId1, osId2…])


  2. Generating Pairs of osIds: For each list of osIds associated with a particular PII element, we generate all possible combinations. For instance, if a username is linked to [osId1, osId2, osId3], we create pairs like:

    • [osId1, osId2], [osId2, osId1], [osId2, osId3], etc.


  3. Aggregating by osId: We then aggregate these combinations by osId on the left side, ultimately yielding clusters such as:

    • (osId1, [osId1, osId2, osId3])


  4. Extracting Unique osIds: Finally, we extract each cluster's unique set of osIds.

While the current implementation of this method doesn’t account for every possible method, it effectively identifies the majority of related osId components, creating a robust entity resolution framework.


Managing costs and estimating performance

This Dataflow job is both efficient and cost-effective. Processing all historical data takes around 30 minutes and costs approximately $3, making it an excellent solution for our use case. 

As a result, we can now more accurately capture Pieces user activity events, allowing us to understand how our product is being used and make accurate assumptions and predictions about user behavior. 

I initially dumped the existing user activity data into a data warehouse (specifically BigQuery) to analyze the data. 

I spent significant time querying the data to come up with a solution or an algorithm to find connected OSIds

Throughout this process, I had to jump between the BigQuery console on my browser and the actual implementation with Scala on IntelliJ.

Pieces WPE helped me remember the queries I created and translated in Scala. For example, I was able to ask “Do you remember the query to connect osIds through emails?

If so, could you translate it into scio/scala?”.

Pieces gave me almost perfect answers and saved a lot of time.

Written by

Written by

SHARE

SHARE

Acceptable and fast user entity resolution using dataflow

Title

Title

our newsletter

Sign up for The Pieces Post

Check out our monthly newsletter for curated tips & tricks, product updates, industry insights and more.

our newsletter

Sign up for The Pieces Post

Check out our monthly newsletter for curated tips & tricks, product updates, industry insights and more.

our newsletter

Sign up for The Pieces Post

Check out our monthly newsletter for curated tips & tricks, product updates, industry insights and more.