How To Predict Multiple Images In Keras At A Time Using Multiple-processing (e.g. With Different Cpus)?
Solution 1:
One python package I know that may help you is joblib
. Hope it can solve your problem.
from joblib importParallel, delayed
# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')
# Define callback function to collect the output in 'outcomes'
outcomes = []
defcollect_result(result):
global outcomes
outcomes.append(result)
# Define prediction functiondefprediction(img):
img = cv2.resize(img,(49,49))
img = img.astype('float32') / 255
img = np.reshape(img,[1,49,49,3])
status = mymodel.predict(img)
status = status[0][1]
return(status)
# Define evaluate functiondefevaluate(i,figure):
# predict the propability of the picture to be in class 0 or 1
img = cv2.imread(figure)
status = prediction(img)
outcome = [figure, status]
return(i,outcome)
outcomes = Parallel(n_jobs=72)(delayed(evaluate)(i,figure) for figure in listoffigurepaths)
Solution 2:
Does a processing-speed or a size-of-RAMor a number-of-CPU-coresor an introduced add-on processing latency matter most?ALL OF THESE DO:
The python multiprocessing
module is known ( and the joblib
does the same ) to:
The
multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Yet, as everything in our Universe, this comes at cost:
The wish, expressed by O/P as:
To speed up the process, I would like to use multiple-processing with CPUs (I have 72 available
will, for this kind of a similar application of a pre-trained mymodel.predict()
-or, if sent into a Pool( 72 )
-execution almost for sure suffocate almost any hardware RAM by swapping.
Here is an example, where "just"-Do-Nothing worker was spawned by the n_jobs = 100
directive - to see what happens ( time-wise ~ 532+ [ms] lost + memory-allocation-wise where XYZ [GB] or RAM have immediately been allocated by O/S ):
This comes from the fact, that each multiprocessing
spawned sub-process ( not threads, as O/P has already experienced on her own ) is first instantiated ( after an adequate add-on latency due to O/S process/RAM-allocations-management ) as a ---FULL-COPY--- of the ecosystem present inside the original python process ( the complete python
interpreter + all its import
-ed modules + all its internal state and data-structures - used or not - ) so indeed huge amounts of RAM-allocations take place ( have you noticed the platform started to SWAP? notice how many sub-processes were spawned until that time and you have a ceiling of how many such can fit in-RAM and it makes devastating performance effects if trying ( or letting, by using the joblib
-s n_jobs = -1
auto-scaling directive ) to populate more sub-processes, that this SWAP-introducing number...
So far good, we have paid some ( often for carefully designed code a reasonably negligible amount, if compared to fully train again the whole predictor, doesn't it? ) time to spawn some number of parallel processes.
If the distributed workload next goes back, to one, common and performance-wise singular resource ( a disk directory-tree with files ), the performance of parallel-processes goes but in wreck havoc - it has to wait for such resource(!) to first get it free again.
Finally, even the "right"-amount of Pool()
-spawned sub-processes, such that prevents am O/S to start SWAPPING RAM to disk and back, the inter-process communication is extremely expensive -- here, serialising ( Pickling/unPickling ) + enQueueing + deQueueing all DATA-objects, one has to pass there and back ( yes, even for the callback
fun ), so the less one sends, the way faster the Pool
-processing will become.
Here, all Pool
-associated processes might benefit from independent logging of the results, which may reduce both the scales and latency of the inter-process communications, but will also consolidate the results, reported by any number of workers into the common log.
How to ... ? First benchmark the costs of each step:
Without hard facts ( measured durations in [us]
), one remains with just an opinion.
defprediction( img ):
img = cv2.resize( img, ( 49, 49 ) )
img = img.astype( 'float32' ) / 255
img = np.reshape( img, [1, 49, 49, 3] )
status = mymodel.predict( img )
status = status[0][1]
return( status )
defevaluate( i, figure ): # predict the propability of the picture to be in class 0 or 1
img = cv2.imread( figure )
status = prediction( img )
outcome = [figure, status]
return( i, outcome )
#--------------------------------------------------from zmq import Stopwatch
aClk = Stopwatch()
#------------------------------------NOW THE COSTS OF ORIGINAL VERSION:
aListOfRESULTs = []
for iii inrange( 100 ):
#-------------------------------------------------aClk-ed---------- SECTION
aClk.start(); _ = evaluate( 1, aFigureNAME ); A = aClk.stop()
#-------------------------------------------------aClk-ed---------- SECTIONprint( "as-is took {0:}[us]".format( A ) );aListOfRESULTs.append( A )
#----------------------------------------------------------------------print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------
Lets try something a bit else:
def eval_w_RAM_allocs_avoided( indexI, aFigureNAME ):
return [ indexI,
[ aFigureNAME,
mymodel.predict( ( cv2.resize( cv2.imread( aFigureNAME ),
( 49, 49 )
).astype( 'float32' ) / 255
).reshape( [1, 49, 49, 3]
)
)[0][1],
],
]
#------------------------------------NOW THE COSTS OF MOD-ed VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
#-------------------------------------------------aClk-ed---------- SECTION
aClk.start()
_ = eval_w_RAM_allocs_avoided( 1, aFigureNAME )
B = aClk.stop()
#-------------------------------------------------aClk-ed---------- SECTION
print( "MOD-ed took {0:}[us] ~ {1:} x".format( B, float( B ) / A ) )
aListOfRESULTs.append( B )
#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------
And the actual img
pre-processing pipeline overhead costs:
#------------------------------------NOW THE COSTS OF THE IMG-PREPROCESSING
aListOfRESULTs = []
for iii inrange( 100 ):
#-------------------------------------------------aClk-ed---------- SECTION
aClk.start()
aPredictorSpecificFormatIMAGE = ( cv2.resize( cv2.imread( aFigureNAME ),
( 49, 49 )
).astype( 'float32' ) / 255
).reshape( [1, 49, 49, 3]
)
C = aClk.stop()
#-------------------------------------------------aClk-ed---------- SECTIONprint( "IMG setup took {0:}[us] ~ {1:} of A".format( C, float( C ) / A ) )
aListOfRESULTs.append( C )
#----------------------------------------------------------------------print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------
Actual fileI/O ops:
#------------------------------------NOW THE COSTS OF THE IMG-FILE-I/O-READ
aListOfRESULTs = []
for iii inrange( 100 ):
#-------------------------------------------------aClk-ed---------- SECTION
aFileNAME = listoffigurepaths[158 + iii * 172]
aClk.start()
_ = cv2.imread( aFileNAME )
F = aClk.stop()
#-------------------------------------------------aClk-ed---------- SECTIONprint( "aFileIO took {0:}[us] ~ {1:} of A".format( F, float( F ) / A ) )
aListOfRESULTs.append( F )
#----------------------------------------------------------------------print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------
Without these hard-fact collected ( as a form of quantitative records-of-evidence ), one could hardly decide, what would be the best performance boosting step here for any massive-scale prediction-pipeline image processing.
Having these items tested, post results and further steps ( be it for going via multiprocessing.Pool
or using other strategy for larger performance scaling, to whatever higher performance ) may first get reasonably evaluated, as the hard facts were first collected to do so.
Solution 3:
img_height = 512 # Height of the input images
img_width =512 # Width of the input images
img_channels = 3 # Number of color channels of the input images
orig_images = [] # Store the images here.
batch_holder = np.zeros((20, img_height, img_width, 3))
img_dir = "path/to/image/"
for i,img, in enumerate(os.listdir(img_dir)):
img = os.path.join(img_dir, img)
orig_images.append(imread(img))
img =image.load_img(img, target_size=(img_height, img_width))
batch_holder[i,:] =img
y_pred = model.predict(batch_holder)
y_pred_decoded = decode_y(y_pred,parameter)
np.set_printoptions(precision=2, suppress=True, linewidth=90)
print("Predicted boxes:\n")
print(' class conf xmin ymin xmax ymax')
print(y_pred_decoded[i])
Display the image and draw the predicted boxes onto it.
Set the colors for the bounding boxes
colors = plt.cm.hsv(np.linspace(0, 1, 21)).tolist()
classes = ['background','class']
current_axis = plt.gca()
for box in y_pred_decoded[I]:
xmin = box[-4] * orig_images[0].shape[1] / img_width
ymin = box[-3] * orig_images[0].shape[0] / img_height
xmax = box[-2] * orig_images[0].shape[1] / img_width
ymax = box[-1] * orig_images[0].shape[0] / img_height
color = colors[int(box[0])]
label = '{}: {:.2f}'.format(classes[int(box[0])], box[1])
current_axis.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
color=color, fill=False, linewidth=2))
current_axis.text(xmin, ymin, label, size='x-large', color='white',
bbox={'facecolor': color, 'alpha': 1.0})
plt.imshow(orig_images[i])
plt.show()
Post a Comment for "How To Predict Multiple Images In Keras At A Time Using Multiple-processing (e.g. With Different Cpus)?"