At Netflix, to promote and recommend the content to users in the best possible way there are many Media Algorithm teams which work hand in hand with content creators and editors. Several of these algorithms aim to improve different manual workflows so that we show the personalized promotional image, trailer or the show to the user.
These media focused machine learning algorithms as well as other teams generate a lot of data from the media files, which we described in our previous blog, are stored as annotations in Marken. We designed a unique concept called Annotation Operations which allows teams to create data pipelines and easily write annotations without worrying about access patterns of their data from different applications.
Lets pick an example use case of identifying objects (like trees, cars etc.) in a video file. As described in the above picture
Notice that we cannot update the annotations from previous runs because we don’t know how many annotations a new algorithm run will result into. It is also very expensive for us to keep track of which annotation needs to be updated.
The goal is that when the consumer comes and searches for annotations of type Objects for the given video file then the following should happen.
Does this remind you of something? This seems very similar (not exactly same) to a distributed transaction.
Typically, an algorithm run can have 2k-5k annotations. There are many naive solutions possible for this problem for example:
Instead our challenge was to implement this feature on top of Cassandra and ElasticSearch databases because that’s what Marken uses. The solution which we present in this blog is not limited to annotations and can be used for any other domain which uses ES and Cassandra as well.
Marken’s architecture diagram is as follows. We refer the reader to our previous blog article for details. We use Cassandra as a source of truth where we store the annotations while we index annotations in ElasticSearch to provide rich search functionalities.
Our goal was to help teams at Netflix to create data pipelines without thinking about how that data is available to the readers or the client teams. Similarly, client teams don’t have to worry about when or how the data is written. This is what we call decoupling producer flows from clients of the data.
Lifecycle of a movie goes through a lot of creative stages. We have many temporary files which are delivered before we get to the final file of the movie. Similarly, a movie has many different languages and each of those languages can have different files delivered. Teams generally want to run algorithms and create annotations using all those media files.
Since algorithms can be run on a different permutations of how the media files are created and delivered we can simplify an algorithm run as follows
Given above we can describe the data model for an annotation operation as follows.
{
"annotationOperationKeys": [
{
"annotationType": "string", ❶
"annotationTypeVersion": “integer”,
"pivotId": "string",
"operationNumber": “integer” ❷
}
],
"id": "UUID",
"operationStatus": "STARTED", ❸
"isActive": true ❹
}
As you can see from the data model that the producer of an annotation has to choose an AnnotationOperationKey which lets them define how they want UPSERT annotations in an AnnotationOperation. Inside, AnnotationOperationKey the important field is pivotId and how it is generated.
Our source of truth for all objects in Marken in Cassandra. To store Annotation Operations we have the following main tables.
Since Cassandra is NoSql, we have more tables which help us create reverse indices and run admin jobs so that we can scan all annotation operations whenever there is a need.
Each annotation in Marken is also indexed in ElasticSearch for powering various searches. To record the relationship between annotation and operation we also index two fields
We provide three APIs to our users. In following sections we describe the APIs and the state management done within the APIs.
When this API is called we store the operation with its OperationKey (tuple of annotationType, annotationType Version and pivotId) in our database. This new operation is marked to be in STARTED state. We store all OperationIDs which are in STARTED state in a distributed cache (EVCache) for fast access during searches.
Users call this API to upsert the annotations in an Operation. They pass annotations along with the OperationID. We store the annotations and also record the relationship between the annotation IDs and the Operation ID in Cassandra. During this phase operations are in isAnnotationOperationActive = ACTIVE and operationStatus = STARTED state.
Note that typically in one operation run there can be 2K to 5k annotations which can be created. Clients can call this API from many different machines or threads for fast upserts.
Once the annotations have been created in an operation clients call FinishAnnotationOperation which changes following
This is the key part for our readers. When a client calls our search API we must exclude
To achieve above
Cassandra is our source of truth so if an error happens we fail the client call. However, once we commit to Cassandra we must handle Elasticsearch errors. In our experience, all errors have happened when the Elasticsearch database is having some issue. In the above case, we created a retry logic for updateByQuery calls to ElasticSearch. If the call fails we push a message to SQS so we can retry in an automated fashion after some interval.
In near term, we want to write a high level abstraction single API which can be called by our clients instead of calling three APIs. For example, they can store the annotations in a blob storage like S3 and give us a link to the file as part of the single API.
Data ingestion pipeline with Operation Management was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.
The rapid advancement of generative AI has brought powerful publicly available large language models (LLMs),…
Today, we are thrilled to announce the public beta launch of Gen AI Toolbox for…
The days of trailers and demos are done as the ESA broadens its horizons. The…
A team of AI and robotics researchers at Carnegie Mellon University, working with a pair…
Jasper Research Lab’s new shadow generation research and model enable brands to create more photorealistic…
We’re announcing new updates to Gemini 2.0 Flash, plus introducing Gemini 2.0 Flash-Lite and Gemini…