Datum Batching
Learn how to batch datums to optimize performance.
December 4, 2023
By default, HPE ML Data Management processes each datum independently. This means that your user code is called once for each datum. This can be inefficient and costly if you have a large number of small datums or if your user code is slow to start.
When you have a large number of datums, you can batch them to optimize performance. HPE ML Data Management provides a next datum
command that you can use to batch datums.
Flow Diagram #
flowchart LR
user_code(User Code)
ndsuccess(NextDatum)
nderror("NextDatum(error)")
response(NextDatumResponse)
process_datum{process datum}
cmd_err(Run cmd_err)
kill[Kill User Code]
datum?{datum exists?}
retry?{retry?}
cmd_err?{cmd_err defined}
user_code ==>ndsuccess
ndsuccess =====> datum?
datum? ==>|yes| process_datum
process_datum ==>|success| response
response ==> user_code
datum? -->|no| kill
process_datum -->|fail| nderror
nderror --> cmd_err?
cmd_err? -->|yes| cmd_err
cmd_err? -->|no|kill
cmd_err --> retry?
retry? -->|yes| response
retry? -->|no| kill
How to Batch Datums #
Define your user code and build a docker image. Your user code must call
pachctl next datum
to get the next datum to process.Language:transformation() { # Your transformation code goes here echo "Transformation function executed" } echo "Starting while loop" while true; do pachctl next datum echo "Next datum called" transformation done
Your user code can apply the
@batch_all_datums
convenience decorator to iterate through all datums. This will perform theNextDatum
calls for you as well as prepare the environment for each datum.import os from pachyderm_sdk import batch_all_datums @batch_all_datums def main(): # Processing code goes here. # This function will be run for each datum until all are processed. # Once all datums are processed, the process is terminated. print(f'datum processed: {os.environ["PACH_DATUM_ID"]}') def init(): # Initializing code goes here. # When this function is called, no input data is present. print('Preparing for datum batching job') if __name__ == '__main__': init() print('Starting datum processing') main()
Create a repo (e.g.,
pachctl create repo repoName
).Define a pipeline spec in YAML or JSON that references your Docker image and repo.
Add the following to the
transform
section of your pipeline spec:datum_batching: true
pipeline: name: p_datum_batching_example input: pfs: repo: repoName glob: "/*" transform: datum_batching: true image: user/docker-image:tag
Create the pipeline (e.g.,
pachctl update pipeline -f pipeline.yaml
).Monitor the pipeline’s state either via Console or via
pachctl list pipeline
.
You can view the printed confirmation of “Next datum called” in the logs your pipeline’s job.
FAQ #
Q: My pipeline started but no files from my input repo are present. Where are they?
A: Files from the first datum are mounted following the first call to NextDatum
or, when using the Python client, when code execution enters the decorated function.
Q: How can I set environment variables when the datum runs?
A: You can use the .env
file accessible from the /pfs
directory. To easily locate your .env
file, you can do the following:
def find_files(pattern):
return [f for f in glob.glob(os.path.join("/pfs", "**", pattern), recursive=True)]
env_file = find_files(".env")