Run Commands

Intro to Pipelines

Learn about the Pipeline System and how to define pipelines in YAML for data transformation and processing, including datums, jobs, and advanced glob patterns.

January 30, 2023

Introduction to Pipelines #

The HPE ML Data Management Pipeline System (PPS) is a powerful tool for automating data transformations. With PPS, pipelines can be automatically triggered whenever input data changes, meaning that data transformations happen automatically in response to changes in your data, without the need for manual intervention.

Pipelines in HPE ML Data Management are defined by a pipeline specification and run on Kubernetes. The output of a pipeline is stored in a versioned data repository, which allows you to reproduce any transformation that occurs in HPE ML Data Management.

Pipelines can be combined into a computational DAG (directed acyclic graph), with each pipeline being triggered when an upstream commit is finished. This allows you to build complex workflows that can process large amounts of data efficiently and with minimal manual intervention.

Pipeline Specification #

This is a HPE ML Data Management pipeline definition in YAML. It describes a pipeline called transform that takes data from the data repository and transforms it using a Python script

  name: transform
    repo: data
    glob: "/*"
  image: my-transform-image:v1.0
    - python
    - "/"
    - "--input"
    - "/pfs/data/"
    - "--output"
    - "/pfs/out/"

Here’s a breakdown of the different sections of the pipeline definition:

So, in summary, this pipeline definition defines a pipeline called transform that takes all files in the data repository, runs a Python script to transform them, and outputs the results to the out repository.

Datums and Jobs #

Pipelines can distribute work across a cluster to parallelize computation. Each time data is committed to a HPE ML Data Management repository, a job is created for each pipeline with that repo as an input to process the data.

To determine how to distribute data and computational work, datums are used. A datum is an indivisible unit of data required by the pipeline, defined according to the pipeline spec. The datums will be distributed across the cluster to be processed by workers.


Only one job per pipeline will be created per commit, but there may be many datums per job.

Datum processing diagram

For example, say you have a bunch of images that you want to normalize to a single size. You could iterate through each image and use opencv to change the size of it. No image depends on any other image, so this task can be parallelized by treating each image as an individual unit of work, a datum.

Next, let’s say you want to create a collage from those images. Now, we need to consider all of the images together to combine them. In this case, the collection of images would be a single datum, since they are all required for the process.

HPE ML Data Management input specifications can handle both of these situations with the glob section of the Pipeline Specification.

Basic Glob Patterns #

In this section we’ll introduce glob patterns and datums in a couple of examples.

In the basic glob pattern example below, the input glob pattern is /*. This pattern matches each image at the top level of the images@master branch as an individual unit of work.

  name: resize
description: A pipeline that resizes an image.
    glob: /*
    repo: images
    - python
    - --input
    - /pfs/images/*
    - --output
    - /pfs/out/
  image: pachyderm/opencv

When the pipeline is executed, it retrieves the datums defined in the input specification. For each datum, the worker downloads the necessary files into the Docker container at the start of its execution, and then performs the transform. Once the execution is complete, the output for each execution is combined into a commit and written to the output data repository.

Datum diagram glob /*

In this example, the input glob pattern is /. This pattern matches everything at the top level of the images@master branch as an individual unit of work.

  name: collage
description: A pipeline that creates a collage for a collection of images.
    glob: /
    repo: images
    - python
    - --input
    - /pfs/images/*
    - --output
    - /pfs/out/
  image: pachyderm/opencv

When this pipeline runs, it retrieves a single datum from the input specification. The job runs the single datum, downloading all the files from the images@master into the Docker container, and performs the transform. The result is then committed to the output data repository.

Datum diagram glob

Advanced Glob Patterns #

Datums can also be created from advanced operations, such as Join, Cross, Group, Union, and others to combine glob patterns from multiple data repositories. This allows us to create complex datum definitions, enabling sophisticated data processing pipelines.

Pipeline Communication (Advanced) #

A much more detailed look at how HPE ML Data Management actually triggers pipelines is shown in the sequence diagram below. This is a much more advanced level of detail, but knowing how the different pieces of the platform interact can be useful.

Before we look at the diagram, it may be helpful to provide a brief recap of the main participants involved:

   participant User
   participant PPS
   participant PFS
   participant Worker
   User->>PFS: pachctl create repo foo
   activate PFS
   Note over PFS: create branch foo@master
   deactivate PFS
   User->>PPS: pachctl create pipeline bar
   activate PPS
   PPS->>PFS: create branch bar@master <br> (provenant on foo@master)
   PPS->>Worker: create pipeline worker master
   Worker->>PFS: subscribe to bar@master <br> (because it's subvenant on foo@master)
   deactivate PPS
   User->>PFS: pachctl put file -f foo@master data.txt
   activate PFS
   Note over PFS: start commit
   PFS->>PFS: propagate commit <br> (start downstream commits)
   Note over PFS: copy data.txt to open commit
   Note over PFS: finish commit
   PFS-->>Worker: subscribed commit returns
   deactivate PFS

   Note over Worker: Pipeline Triggered
   activate Worker
   Worker->>PPS: Create job
   Worker->>PFS: request datums for commit
   PFS-->>Worker: Datum list
   loop Each datum
       PFS->>Worker: download datum
       Note over Worker: Process datum with user code
       Worker->>PFS: copy data to open output commit
   Worker->>PFS: Finish commit
   Worker->>PPS: Finish job
   deactivate Worker

This diagram illustrates the data flow and interaction between the user, the HPE ML Data Management Pipeline System (PPS), the HPE ML Data Management File System (PFS), and a worker node when creating and running a HPE ML Data Management pipeline. Note, this is simplified for the single worker case. The multi-worker and autoscaling mechanisms are more complex.

The sequence of events begins with the user creating a PFS repo called foo and a PPS pipeline called bar with the foo repo as its input. When the pipeline is created, PPS creates a branch called bar@master, which is provenant on the foo@master branch in PFS. A worker pod is then created in the Kubernetes cluster by PPS, which subscribes to the bar@master branch.

When the user puts a file named data.txt into the foo@master branch, PFS starts a new commit and propagates the commit, opening downstream commits for anything impacted. The worker receives the subscribed commit and when it finishes, triggers the pipeline.

The triggered pipeline creates a job for the pipeline, requesting datums for the output commit. For each datum, the worker downloads the data, processes it with the user’s code, and writes the output to an open output commit in PFS. Once all datums have been processed, the worker finishes the output commit and the job is marked as complete.