Beginner Tutorial: Create Pipelines
Part 4: Create the Pipelines
#
Pipelines make up the bulk of your data processing workflow. Together, they act as a series of stages that transform data from one state to another based on the user code included in their Docker image . A pipeline’s overall behavior is defined by its Pipeline Specification (PPS) .
For this tutorial, we’ll use pre-defined Docker images and pipeline specifications so that you can focus on learning the Pachyderm-specific concepts.
Video Converter Pipeline
#
We want to make sure that our DAG can handle videos in multiple formats, so first we’ll create a pipeline that will:
Skip images
Skip videos already in the correct format (.mp4
)
Convert videos to .mp4
format
The converted videos will be made available to the next pipeline in the DAG via the video_mp4_converter
repo by declaring in the user code to save all converted images to /pfs/out/
. This is the standard location for storing output data so that it can be accessed by the next pipeline in the DAG.
Pachctl CLI
Console
Open your IDE terminal.
Create a new folder for your project called video-to-frame-traces
.
Copy and paste the following pipeline spec into the terminal to create the file.
cat << EOF > video_mp4_converter.yaml
pipeline :
name : video_mp4_converter
input :
pfs :
repo : raw_videos_and_images
glob : "/*"
transform :
image : lbliii / video_mp4_converter : 1.0.14
cmd :
- python3
- / video_mp4_converter.py
- -- input
- / pfs / raw_videos_and_images /
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
Create the pipeline by running the following command:
pachctl create pipeline - f video_mp4_converter.yaml
Copy
Navigate to your project.
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "video_mp4_converter"
},
"input" : {
"pfs" : {
"repo" : "raw_videos_and_images" ,
"glob" : "/*"
}
},
"transform" : {
"image" : "lbliii/video_mp4_converter:1.0.16" ,
"cmd" : [
"python3" ,
"/video_mp4_converter.py" ,
"--input" ,
"/pfs/raw_videos_and_images/" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
import cv2
import pathlib
import os
import argparse
import shutil
def video_to_mp4 (
input , output , fps : int = 0 , frame_size : tuple = (), fourcc : str = "XVID"
):
print ( f "Converting video: { input } " )
vidcap = cv2 . VideoCapture ( input )
if not fps :
fps = round ( vidcap . get ( cv2 . CAP_PROP_FPS ))
success , arr = vidcap . read ()
if not frame_size :
height , width , _ = arr . shape
frame_size = width , height
writer = cv2 . VideoWriter (
output ,
apiPreference = 0 ,
fourcc = cv2 . VideoWriter_fourcc ( * fourcc ),
fps = fps ,
frameSize = frame_size ,
)
frame_count = 0
while success :
frame_count += 1
writer . write ( arr )
success , arr = vidcap . read ()
writer . release ()
vidcap . release ()
print ( f "Converted { frame_count } frames" )
def process_video_files ( input_path , output_path ):
for root , _ , files in os . walk ( input_path ):
for file in files :
file_path = os . path . join ( root , file )
# Skip non-video files
if not file_path . lower () . endswith (( '.mp4' , '.avi' , '.mov' )):
print ( f "Skipping: { file_path } " )
return
# Convert video files to MP4
if file_path . lower () . endswith (( '.avi' , '.mov' )):
base_file = os . path . splitext ( file )[ 0 ]
out_path = os . path . join ( output_path , base_file + ".mp4" )
video_to_mp4 ( file_path , output = out_path )
print ( f "Converted: { file } to { out_path } " )
else :
# Copy existing MP4 files
out_path = os . path . join ( output_path , file )
print ( f "Copying: { file } to { out_path } " )
shutil . copy ( file_path , out_path )
def main ():
parser = argparse . ArgumentParser (
prog = 'convert_to_mp4.py' ,
description = 'Convert non-MP4 videos to MP4'
)
parser . add_argument ( '-i' , '--input' , required = True , help = 'Input video directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output video directory' )
args = parser . parse_args ()
if not os . path . exists ( args . input ):
print ( f "Warning: The specified input directory ' { args . input } ' does not exist. Please check the path and try again." )
return
if not os . path . exists ( args . output ):
os . makedirs ( args . output )
print ( f "Input: { args . input } " )
print ( f "Output: { args . output } " )
print ( "======================" )
process_video_files ( args . input , args . output )
if __name__ == "__main__" :
main ()
Copy
You can view the output commit by running one of following commands:
pachctl list commits video_mp4_converter
pachctl list commits video_mp4_converter @ master
Copy
PROJECT REPO BRANCH COMMIT FINISHED SIZE ORIGIN DESCRIPTION
video - to - frame - traces video_mp4_converter master 30 dadf5cfdf447d2b8ed45644653a509 51 minutes ago 14.05 MiB AUTO
Copy
You can view the output files by running one of the following commands:
pachctl list files video_mp4_converter @ master
pachctl list files video_mp4_converter @< commit - id >
Copy
NAME TYPE SIZE
/ cat - sleeping.mp4 file 7.535 MiB
/ highway.mp4 file 6.517 MiB
Copy
Info
Every pipeline, at minimum, needs a name
, an input
, and a transform
. The input is the data that the pipeline will process, and the transform is the user code that will process the data. transform.image
is the Docker image available in a container registry ( Docker Hub ) that will be used to run the user code. transform.cmd
is the command that will be run inside the Docker container; it is the entrypoint for the user code to be executed against the input data.
Image Flattener Pipeline
#
Next, we’ll create a pipeline that will flatten the videos into individual .png
image frames. Like the previous pipeline, the user code outputs the frames to /pfs/out
so that the next pipeline in the DAG can access them in the image_flattener
repo.
Pachctl CLI
Console
cat << EOF > image_flattener.yaml
pipeline :
name : image_flattener
input :
pfs :
repo : video_mp4_converter
glob : "/*"
transform :
image : lbliii / image_flattener : 1.0.0
cmd :
- python3
- / image_flattener.py
- -- input
- / pfs / video_mp4_converter
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
pachctl create pipeline - f image_flattener.yaml
Copy
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "image_flattener"
},
"input" : {
"pfs" : {
"repo" : "video_mp4_converter" ,
"glob" : "/*"
}
},
"transform" : {
"image" : "lbliii/image_flattener:1.0.0" ,
"cmd" : [
"python3" ,
"/image_flattener.py" ,
"--input" ,
"/pfs/video_mp4_converter" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
#! /Users/reidm/anaconda3/envs/openCV/bin/python
"""
Take Video input and flatten it to individual frames/images
"""
import cv2
import os
import argparse
import pathlib
def flatten_video ( vid_path , out_path , base_file ):
os . makedirs ( out_path , exist_ok = True )
current_frame = 0
video = cv2 . VideoCapture ( vid_path )
while True :
ret , frame = video . read ()
if ret :
name = os . path . join ( out_path , f ' { base_file } -frame- { current_frame : 010d } .jpg' )
cv2 . imwrite ( name , frame )
current_frame += 1
else :
break
video . release ()
def process_video_files ( input_path , output_path ):
for root , _ , files in os . walk ( input_path ):
for file in files :
file_path = os . path . join ( root , file )
if pathlib . Path ( file_path ) . suffix . lower () == '.mp4' :
base_file = os . path . splitext ( file )[ 0 ]
out_path = os . path . join ( output_path , base_file )
print ( f "Converting: { file } " )
flatten_video ( file_path , out_path , base_file )
else :
print ( f "Skipping: { file_path } " )
def main ():
parser = argparse . ArgumentParser (
prog = 'flatten_to_images.py' ,
description = 'Flatten Video to Images'
)
parser . add_argument ( '-i' , '--input' , required = True , help = 'Input video directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output image directory' )
args = parser . parse_args ()
if not os . path . exists ( args . input ):
print ( f "Warning: The specified input directory ' { args . input } ' does not exist. Please check the path and try again." )
return
if not os . path . exists ( args . output ):
os . makedirs ( args . output )
print ( f "Input: { args . input } " )
print ( f "Output: { args . output } " )
print ( "======================" )
process_video_files ( args . input , args . output )
if __name__ == "__main__" :
main ()
Copy
pachctl list files image_flattener @ master
Copy
Image Tracing Pipeline
#
Up until this point, we’ve used a simple single input from the Pachyderm file system (input.pfs
) and a basic glob pattern (/*
) to specify shape of our datums. This particular pattern treats each top-level file and directory as a single datum. However, in this pipeline, we have some special requirements:
We want to process only the raw images from the raw_videos_and_images
repo
We want to process all of the flattened video frame images from the image_flattener
pipeline
To achieve this, we’re going to need to use a union input (input.union
) to combine the two inputs into a single input for the pipeline.
For the raw_videos_and_images
input, we can use a more powerful glob pattern to ensure that only image files are processed (/*.{png,jpg,jpeg}
)
For the image_flattener
input, we can use the same glob pattern as before (/*
) to ensure that each video’s collection of frames is processed together
Notice how we also update the transform.cmd
to accommodate having two inputs.
Pachctl CLI
Console
cat << EOF > image_tracer.yaml
pipeline :
name : image_tracer
description : A pipeline that performs image edge detection by using the OpenCV library.
input :
union :
- pfs :
repo : raw_videos_and_images
glob : "/*.{png,jpg,jpeg}"
- pfs :
repo : image_flattener
glob : "/*"
transform :
image : lbliii / image_tracer : 1.0.8
cmd :
- python3
- / image_tracer.py
- -- input
- / pfs / raw_videos_and_images
- / pfs / image_flattener
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
pachctl create pipeline - f image_tracer.yaml
Copy
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "image_tracer"
},
"description" : "A pipeline that performs image edge detection by using the OpenCV library." ,
"input" : {
"union" : [
{
"pfs" : {
"repo" : "raw_videos_and_images" ,
"name" : "tracer" ,
"glob" : "/*.{png,jpg,jpeg}"
}
},
{
"pfs" : {
"repo" : "image_flattener" ,
"name" : "tracer" ,
"glob" : "/*"
}
}
]
},
"transform" : {
"image" : "lbliii/image_tracer:1.0.9" ,
"cmd" : [
"python3" ,
"/image_tracer.py" ,
"--input" ,
"/pfs/tracer" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
import cv2
import numpy as np
from matplotlib import pyplot as plt
import os
import argparse
def make_edges ( image , input_path , output_path ):
try :
img = cv2 . imread ( image )
if img is None :
print ( f "Error reading image: { image } " )
return
tail = os . path . split ( image )[ 1 ]
subdirectory = os . path . relpath ( os . path . dirname ( image ), input_path )
# Check if the file is in the top-level directory
if subdirectory == '.' :
output_subdir = output_path
output_filename = os . path . splitext ( tail )[ 0 ] + '_edges.png'
else :
output_subdir = os . path . join ( output_path , subdirectory + "_edges" )
output_filename = os . path . splitext ( tail )[ 0 ] + '.png'
os . makedirs ( output_subdir , exist_ok = True )
edges = cv2 . Canny ( img , 100 , 200 )
output_filepath = os . path . join ( output_subdir , output_filename )
plt . imsave ( output_filepath , edges , cmap = 'gray' )
print ( f "Processed: { image } -> { output_filepath } " )
except Exception as e :
print ( f "Error processing image: { image } " )
print ( f "Error details: { e } " )
def process_image_files ( input_path , output_path ):
for dirpath , dirs , files in os . walk ( input_path ):
total = len ( files )
current = 0
for file in files :
current += 1
print ( f "Processing { file } , # { current } of { total } " )
make_edges ( os . path . join ( dirpath , file ), input_path , output_path )
def main ():
parser = argparse . ArgumentParser (
prog = 'image_tracer.py' ,
description = 'Trace Images'
)
parser . add_argument ( '-i' , '--input' , nargs = '+' , required = True , help = 'Input image directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output image directory' )
args = parser . parse_args ()
print ( f "Input: { args . input } " )
print ( f "Output: { args . output } " )
print ( "======================" )
for input_path in args . input :
if not os . path . exists ( input_path ):
print ( f "Warning: The specified input directory ' { input_path } ' does not exist. Please check the path and try again." )
else :
print ( f "Processing images in: { input_path } " )
process_image_files ( input_path , args . output )
print ( "Done." )
return
if __name__ == "__main__" :
main ()
Copy
pachctl list files image_tracer @ master
Copy
Info
Since this pipeline is converting videos to video frames, it may take a few minutes to complete.
Gif Pipeline
#
Next, we’ll create a pipeline that will create two gifs:
A gif of the original video’s flattened frames (from the image_flattener
output repo)
A gif of the video’s traced frames (from the image_tracer
output repo)
To make a gif of both the original video frames and the traced frames, we’re going to again need to use a union input so that we can process the image_flattener
and image_tracer
output repos.
Notice that the glob pattern has changed; here, we want to treat each directory in an input as a single datum, so we use the glob pattern /*/
. This is because we’ve declared in the user code to store the video frames in a directory with the same name as the video file.
Pachctl CLI
Console
cat << EOF > movie_gifer.yaml
pipeline :
name : movie_gifer
description : A pipeline that converts frames into a gif using the OpenCV library.
input :
union :
- pfs :
repo : image_flattener
glob : "/!(*.*)"
- pfs :
repo : image_tracer
glob : "/!(*.*)"
transform :
image : lbliii / movie_gifer : 1.0.5
cmd :
- python3
- / movie_gifer.py
- -- input
- / pfs / image_flattener
- / pfs / image_tracer
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
pachctl create pipeline - f movie_gifer.yaml
Copy
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "movie_gifer"
},
"description" : "A pipeline that converts frames into a gif using the OpenCV library." ,
"input" : {
"union" : [
{
"pfs" : {
"repo" : "image_flattener" ,
"name" : "gifer" ,
"glob" : "/!(*.*)"
}
},
{
"pfs" : {
"repo" : "image_tracer" ,
"name" : "gifer" ,
"glob" : "/!(*.*)"
}
}
]
},
"transform" : {
"image" : "lbliii/movie_gifer:1.0.6" ,
"cmd" : [
"python3" ,
"/movie_gifer.py" ,
"--input" ,
"/pfs/gifer" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
import cv2
import os
import argparse
import imageio
def make_gifs ( directory , input_path , output_path ):
try :
# Create output folder if it doesn't exist
relative_dir = os . path . relpath ( directory , input_path )
output_folder = os . path . join ( output_path , relative_dir )
os . makedirs ( output_folder , exist_ok = True )
# Get all the images in the provided directory
images = [ os . path . join ( directory , f ) for f in os . listdir ( directory ) if f . endswith ( '.jpg' ) or f . endswith ( '.png' )]
# Sort the pngs so they are in order
images . sort ()
# Create the output filename
tail = os . path . split ( directory )[ 1 ]
base_filename = os . path . splitext ( tail )[ 0 ]
if 'tracer' in input_path :
suffix = 'edges'
else :
suffix = 'original'
output_filename = f ' { base_filename } _ { suffix } .gif'
output_filepath = os . path . join ( output_folder , output_filename )
# Create the gif
gif_images = [ cv2 . imread ( i )[:, :, :: - 1 ] for i in images ] # Convert BGR to RGB
imageio . mimsave ( output_filepath , gif_images , duration = 0.1 )
print ( f "Processed: { directory } -> { output_filepath } " )
except Exception as e :
print ( f "Error processing directory: { directory } " )
print ( f "Error details: { e } " )
def process_image_directories ( input_path , output_path ):
# For each directory of images, make a gif
for dirpath , dirs , files in os . walk ( input_path ):
total = len ( dirs )
current = 0
for dir in dirs :
current += 1
print ( f "Processing { dir } , # { current } of { total } " )
make_gifs ( os . path . join ( dirpath , dir ), input_path , output_path )
def main ():
parser = argparse . ArgumentParser (
prog = 'movie_gifer.py' ,
description = 'Convert images to gifs'
)
parser . add_argument ( '-i' , '--input' , nargs = '+' , required = True , help = 'Input image directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output image directory' )
args = parser . parse_args ()
print ( f "Input: { args . input } " )
print ( f "Output: { args . output } " )
print ( "======================" )
for input_path in args . input :
if not os . path . exists ( input_path ):
print ( f "Warning: The specified input directory ' { input_path } ' does not exist. Please check the path and try again." )
else :
print ( f "Processing images in: { input_path } " )
process_image_directories ( input_path , args . output )
print ( "Done." )
if __name__ == "__main__" :
main ()
Copy
pachctl list files movie_gifer @ master
Copy
Info
Since this pipeline is converting video frames to gifs, it may take a few minutes to complete.
Content Shuffler Pipeline
#
We have everything we need to make the comparison collage, but before we do that we need to re-shuffle the content so that the original images and gifs are in one directory (originals
) and the traced images and gifs are in another directory (edges
). This will help us more easily process the data via our user code for the collage. This is a common step you will encounter while using HPE Machine Learning Data Management referred to as a shuffle pipeline .
Pachctl CLI
Console
cat << EOF > content_shuffler.yaml
pipeline :
name : content_shuffler
description : A pipeline that collapses our inputs into one datum for the collager.
input :
union :
- pfs :
repo : movie_gifer
glob : "/"
- pfs :
repo : raw_videos_and_images
glob : "/*.{png,jpg,jpeg}"
- pfs :
repo : image_tracer
glob : "/*.{png,jpg,jpeg}"
transform :
image : lbliii / content_shuffler : 1.0.0
cmd :
- python3
- / content_shuffler.py
- -- input
- / pfs / movie_gifer
- / pfs / raw_videos_and_images
- / pfs / image_tracer
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
pachctl create pipeline - f content_shuffler.yaml
Copy
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "content_shuffler"
},
"description" : "A pipeline that collapses our inputs into one datum for the collager." ,
"input" : {
"union" : [
{
"pfs" : {
"repo" : "movie_gifer" ,
"name" : "shuffler" ,
"glob" : "/"
}
},
{
"pfs" : {
"repo" : "raw_videos_and_images" ,
"name" : "shuffler" ,
"glob" : "/*.{png,jpg,jpeg}"
}
},
{
"pfs" : {
"repo" : "image_tracer" ,
"name" : "shuffler" ,
"glob" : "/*.{png,jpg,jpeg}"
}
}
]
},
"transform" : {
"image" : "lbliii/content_shuffler:1.0.1" ,
"cmd" : [
"python3" ,
"/content_shuffler.py" ,
"--input" ,
"/pfs/shuffler" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
# content_shuffler.py
import argparse
import os
import shutil
def shuffle_content ( input_path , output_path ):
# create an originals and edges directory in the output path
originals_output_path = f " { output_path } /originals"
if not os . path . exists ( originals_output_path ):
os . makedirs ( originals_output_path )
edges_output_path = f " { output_path } /edges"
if not os . path . exists ( edges_output_path ):
os . makedirs ( edges_output_path )
for dirpath , dirs , files in os . walk ( input_path ):
for file in files :
if "frame" and "edges" not in file :
# Copy the original image to the originals directory
shutil . copy ( f " { dirpath } / { file } " , originals_output_path )
elif "frame" not in file and "edges" in file :
# Copy the images and gifs to the edges directory
shutil . copy ( f " { dirpath } / { file } " , edges_output_path )
def main ():
parser = argparse . ArgumentParser (
prog = 'content_collager.py' ,
description = 'Convert images and gifs into a collage'
)
parser . add_argument ( '-i' , '--input' , nargs = '+' , required = True , help = 'Input image directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output image directory' )
args = parser . parse_args ()
print ( f "Input: { args . input } \n Output: { args . output } \n " )
total_inputs = len ( args . input )
current_input = 0
for input_path in args . input :
try :
current_input += 1
print ( f " { input_path } ; { current_input } / { total_inputs } " )
shuffle_content ( input_path , args . output )
except Exception as e :
print ( f "Exception: { e } " )
if __name__ == "__main__" :
main ()
Copy
pachctl list files content_shuffler @ master
Copy
Content Collager Pipeline
#
Finally, we’ll create a pipeline that produces a static html page for viewing the original and traced content side-by-side.
Pachctl CLI
Console
cat << EOF > content_collager.yaml
pipeline :
name : content_collager
description : A pipeline that creates a static HTML collage.
input :
pfs :
glob : "/"
repo : content_shuffler
transform :
image : lbliii / content_collager : 1.0.64
cmd :
- python3
- / content_collager.py
- -- input
- / pfs / content_shuffler
- -- output
- / pfs / out /
autoscaling : true
EOF
Copy
pachctl create pipeline - f content_collager.yaml
Copy
Select Create > Pipeline .
Update the pipeline spec with the following:
{
"pipeline" : {
"name" : "content_collager"
},
"description" : "A pipeline that creates a static HTML collage." ,
"input" : {
"pfs" : {
"glob" : "/" ,
"repo" : "content_shuffler"
}
},
"transform" : {
"image" : "lbliii/content_collager:1.0.64" ,
"cmd" : [
"python3" ,
"/content_collager.py" ,
"--input" ,
"/pfs/content_shuffler" ,
"--output" ,
"/pfs/out/"
]
},
"autoscaling" : true
}
Copy
Select Create Pipeline .
Reference Assets
#
Diagram
User Code
Output
# content_collager.py
import os
import argparse
from bs4 import BeautifulSoup
import shutil
html_template = """
<html>
<head>
<title>Content Collage</title>
<style>
#collage-container {
display: flex;
}
#originals,
#edges {
flex: 1;
padding: 20px;
border: 1px solid #ccc;
display: flex;
flex-direction: column;
align-items: center;
gap: 10px;
}
</style>
</head>
<body>
<h1>Content collage</h1>
<div id="collage-container">
<div id="originals">
<h2>Original</h2>
</div>
<div id="edges">
<h2>Edges</h2>
</div>
</div>
</body>
</html>
"""
def create_html_page ( output_path ):
index_path = os . path . join ( output_path , "collage" , "index.html" )
if not os . path . exists ( os . path . join ( output_path , "collage" )):
os . makedirs ( os . path . join ( output_path , "collage" ))
if not os . path . exists ( index_path ):
with open ( index_path , "w" ) as f :
f . write ( html_template )
def append_image_to_html_page ( output_path , image_path ):
index_path = os . path . join ( output_path , "collage" , "index.html" )
with open ( index_path , "r" ) as f :
html_content = f . read ()
soup = BeautifulSoup ( html_content , "html.parser" )
# if the image path has the word "originals" in it, add it to the originals div
if "edges" in image_path :
originals_div = soup . find ( "div" , id = "edges" )
if originals_div :
img_tag = soup . new_tag ( "img" , src = image_path , width = "300" , style = "display: block;" )
originals_div . append ( img_tag )
with open ( index_path , "w" ) as f :
f . write ( str ( soup ))
# otherwise, add it to the collage div
else :
collage_div = soup . find ( "div" , id = "originals" )
if collage_div :
img_tag = soup . new_tag ( "img" , src = image_path , width = "300" , style = "display: block;" )
collage_div . append ( img_tag )
with open ( index_path , "w" ) as f :
f . write ( str ( soup ))
def process_content ( input_path , output_path ):
try :
# Create the HTML page
create_html_page ( output_path )
# Create the output directory /collage/static
static_output_path = os . path . join ( output_path , "collage" , "static" )
if not os . path . exists ( static_output_path ):
os . makedirs ( static_output_path )
for dirpath , _ , files in os . walk ( input_path ):
sorted_files = sorted ( files )
for file in sorted_files :
print ( f "Copying { file } to { static_output_path } " )
shutil . copy ( os . path . join ( dirpath , file ), os . path . join ( static_output_path , file ))
append_image_to_html_page ( output_path , f "./static/ { file } " )
except Exception as e :
print ( f "Exception: { e } " )
def main ():
parser = argparse . ArgumentParser (
prog = 'content_collager.py' ,
description = 'Convert images and gifs into a collage'
)
parser . add_argument ( '-i' , '--input' , nargs = '+' , required = True , help = 'Input image directory' )
parser . add_argument ( '-o' , '--output' , required = True , help = 'Output image directory' )
args = parser . parse_args ()
print ( f "Input: { args . input } \n Output: { args . output } \n " )
try :
process_content ( args . input [ 0 ], args . output )
except Exception as e :
print ( f "Exception: { e } " )
if __name__ == "__main__" :
main ()
Copy
Navigate to your project in Console.
Click on the content_collager
pipeline’s output directory.
Click Inspect Commit .
Check the [x] collage directory.
Select the Download button.
Unzip and open the index.html
file in your browser to see the collage.
pachctl list files content_collager @ master
Copy