Hi Owen, James,
thank you both for your help!
So altering the original code I was able to almost get the same as Owen,
//forked
Using 1 threads, time taken is: 35.332030118 - all process and wait
Using 12 threads, time taken is: 2.992354422 - all process and wait
Using 1 threads, time taken is: 6.481287311 - processBlocking
Using 12 threads, time taken is: 1.43672477 - processBlocking
//unforked
Time taken is: 6.42059408 - processBlocking
Time taken is: 35.808163976 - all process and wait
And here is that code.
(
s.waitForBoot({
var num_threads = 1;
var thread = (
\pool : Semaphore(num_threads),
\cond: Condition(),
\startTime: Main.elapsedTime,
\timeStage1: nil,
\loopStartTime: nil
);
// one large concat.ed buffer
var audio = {
var dir = FluidDataSet.class.filenameSymbol;
var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
var return = Buffer.new(s);
s.sync;
bufs.do{
|b|
FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
b.free;
};
return.postln;
return
}.();
var slices = {
var return = Buffer.new(s);
s.sync;
FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.5).wait;
thread.timeStage1 = Main.elapsedTime;
return.postln;
return
}.();
var dataSet = FluidDataSet(s);
var calc_spectral_mean_per_slice = {
|start, end, index, total_num_slices|
var out_of_order_var_request_1 = thread.pool.wait;
var num_frames = end - start;
var spectral_mean = Buffer.new(s);
var spectral_shape = Buffer.new(s);
var spectral_stats = Buffer.new(s);
var this_slice = Buffer.new(s);
s.sync;
FluidBufCompose.process(
s,
source:audio,
startFrame:start,
numFrames: num_frames,
destination: this_slice
).wait;
FluidBufSpectralShape.process(
s,
source: this_slice,
features: spectral_shape
).wait;
FluidBufStats.process(
s,
source: spectral_shape,
stats: spectral_stats,
).wait;
FluidBufSelect.process(
s,
source: spectral_stats,
destination: spectral_mean,
indices: [0]
).wait;
dataSet.addPoint(index.asString, spectral_mean);
dataSet.size( action: {
|size|
if (size == (total_num_slices - 1), {
thread.cond.unhang;
})
});
if((index % 50) == 0,{
"Finished Processing Slide % / %".format(index, total_num_slices).postln;
});
spectral_shape.free; spectral_stats.free; spectral_mean.free;this_slice.free;
thread.pool.signal;
};
s.sync;
slices.loadToFloatArray(action: {
|slices_array|
thread.loopStartTime = Main.elapsedTime;
if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
slices_array = slices_array.add(audio.numFrames);
});
slices_array.doAdjacentPairs{
|start, end, index|
{
calc_spectral_mean_per_slice.(start, end, index, slices_array.size())
}.fork(SystemClock)
};
thread.cond.hang;
"Using % threads, time taken is: %".format(num_threads, Main.elapsedTime - thread.loopStartTime).postln;
dataSet.print;
});
});
)
… but my intention is to have many many small slices so I rewrote this to clump the slices together and distribute them across the threads and the performance increase is quite impressive, at 11.3 (95%)!
Interestingly if I change from processBlocking to process then wait, it just crashes…
I found that copying the slice out of the large audio buffer didn’t give any benefits in this case.
I have no idea why processBlocking is always significantly faster than process then wait, do you have an example of the opposite?
Also, the CPU usage in htop does not exceed 100%, so this just look like concurrency rather than parallel processing.
Results
Using 1 threads, time taken is: 6.41734852 for 997 slices
Using 12 threads, time taken is: 1.718137513 for 997 slices
Using 1 threads, time taken is: 64.720428981 for 11081 slices
Using 12 threads, time taken is: 5.688826399 for 11081 slices
(
s.waitForBoot({
var num_threads = 12;
var thread = (
\pool : Semaphore(num_threads),
\cond: Condition(),
\startTime: Main.elapsedTime,
\timeStage1: nil,
\loopStartTime: nil
);
// one large concat.ed buffer
var audio = {
var dir = FluidDataSet.class.filenameSymbol;
var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
var return = Buffer.new(s);
s.sync;
bufs.do{
|b|
FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
b.free;
};
return.postln;
return
}.();
var slices = {
var return = Buffer.new(s);
s.sync;
FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.05).wait;
thread.timeStage1 = Main.elapsedTime;
return.postln;
return
}.();
var dataSet = FluidDataSet(s);
var spectral_mean_per_slice_lambda = {
|start, end, index, total_num_slices, this_data_set|
var num_frames = end - start;
var spectral_mean = Buffer.new(s);
var spectral_shape = Buffer.new(s);
var spectral_stats = Buffer.new(s);
var this_slice = Buffer.new(s);
s.sync;
FluidBufSpectralShape.processBlocking(
s,
source: audio,
startFrame:start,
numFrames: num_frames,
features: spectral_shape
);
FluidBufStats.processBlocking(
s,
source: spectral_shape,
stats: spectral_stats,
);
FluidBufSelect.processBlocking(
s,
source: spectral_stats,
destination: spectral_mean,
indices: [0]
);
dataSet.addPoint(index.asString, spectral_mean);
"Finished Processing Slide % / %".format(index, total_num_slices).postln;
spectral_mean.free; spectral_shape.free; spectral_stats.free; this_slice.free;
};
var clump_slices_lambda = {
|slices_array|
var startArr = slices_array.slice((0..(slices_array.size - 2)));
var endArr = slices_array.slice((1..(slices_array.size - 1)));
var indexArr = (0..(slices_array.size - 1));
var eventArray = [startArr, endArr, indexArr].flopWith({
|start, end, index|
(\start_v: start, \end_v: end, \index_v: index, \size_v: end - start)
});
var targetSize = ( (endArr-startArr).reduce('+') / num_threads).floor;
var current_size_v = 0;
// array of clumped slices
eventArray.inject([[]] , {
|arr, ev|
current_size_v = current_size_v + ev.size_v;
current_size_v.postln;
if(current_size_v >= targetSize, {
current_size_v = 0;
arr ++ [[ev]] // append a new array, reseting current clump size
}, {
arr.collect{|a, index| if(index == (arr.size - 1), {a ++ [ev] }, { a })} // append to last inner array
});
});
};
var threaded_function_lambda = {
|clump, numSlices|
clump.do{
|sl|
spectral_mean_per_slice_lambda.(
sl.start_v,
sl.end_v,
sl.index_v,
numSlices
);
};
dataSet.size( action: {
|sz|
if (sz >= (numSlices - 1), {
thread.cond.unhang;
})
});
};
s.sync;
slices.loadToFloatArray(action: {
|slices_array|
var out_of_line_var_1 = thread.loopStartTime = Main.elapsedTime;
var safe_slices_array = if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
slices_array = slices_array.add(audio.numFrames);
});
var clumpedSlices = clump_slices_lambda.(safe_slices_array);
var clumpSlicesSz = clumpedSlices.inject(0, {|count, a| count + a.size});
clumpedSlices.do{
|clump|
{ threaded_function_lambda.(clump, clumpSlicesSz) }.fork(SystemClock);
};
thread.cond.hang;
dataSet.size(action:{
|sz|
dataSet.dump;
dataSet.print;
"Using % threads, time taken is: % for % slices".format(num_threads, Main.elapsedTime - thread.loopStartTime, sz).postln;
});
});
});
)
I haven’t actually done anything with the results, I was going to pop in to a k-d tree, but the debug from the data set looks good.
Anayway… with these sorts of speeds I don’t think there is all that much point in a supernova version, and my problem is solved. Thank you!