Hi Owen, James,
thank you both for your help!
So altering the original code I was able to almost get the same as Owen,
//forked
Using 1 threads, time taken is: 35.332030118 - all process and wait
Using 12 threads, time taken is: 2.992354422 - all process and wait
Using 1 threads, time taken is: 6.481287311 - processBlocking
Using 12 threads, time taken is: 1.43672477 - processBlocking
//unforked
Time taken is: 6.42059408 - processBlocking
Time taken is: 35.808163976 - all process and wait
And here is that code.
(
s.waitForBoot({
var num_threads = 1;
var thread = (
\pool : Semaphore(num_threads),
\cond: Condition(),
\startTime: Main.elapsedTime,
\timeStage1: nil,
\loopStartTime: nil
);
// one large concat.ed buffer
var audio = {
var dir = FluidDataSet.class.filenameSymbol;
var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
var return = Buffer.new(s);
s.sync;
bufs.do{
|b|
FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
b.free;
};
return.postln;
return
}.();
var slices = {
var return = Buffer.new(s);
s.sync;
FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.5).wait;
thread.timeStage1 = Main.elapsedTime;
return.postln;
return
}.();
var dataSet = FluidDataSet(s);
var calc_spectral_mean_per_slice = {
|start, end, index, total_num_slices|
var out_of_order_var_request_1 = thread.pool.wait;
var num_frames = end - start;
var spectral_mean = Buffer.new(s);
var spectral_shape = Buffer.new(s);
var spectral_stats = Buffer.new(s);
var this_slice = Buffer.new(s);
s.sync;
FluidBufCompose.process(
s,
source:audio,
startFrame:start,
numFrames: num_frames,
destination: this_slice
).wait;
FluidBufSpectralShape.process(
s,
source: this_slice,
features: spectral_shape
).wait;
FluidBufStats.process(
s,
source: spectral_shape,
stats: spectral_stats,
).wait;
FluidBufSelect.process(
s,
source: spectral_stats,
destination: spectral_mean,
indices: [0]
).wait;
dataSet.addPoint(index.asString, spectral_mean);
dataSet.size( action: {
|size|
if (size == (total_num_slices - 1), {
thread.cond.unhang;
})
});
if((index % 50) == 0,{
"Finished Processing Slide % / %".format(index, total_num_slices).postln;
});
spectral_shape.free; spectral_stats.free; spectral_mean.free;this_slice.free;
thread.pool.signal;
};
s.sync;
slices.loadToFloatArray(action: {
|slices_array|
thread.loopStartTime = Main.elapsedTime;
if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
slices_array = slices_array.add(audio.numFrames);
});
slices_array.doAdjacentPairs{
|start, end, index|
{
calc_spectral_mean_per_slice.(start, end, index, slices_array.size())
}.fork(SystemClock)
};
thread.cond.hang;
"Using % threads, time taken is: %".format(num_threads, Main.elapsedTime - thread.loopStartTime).postln;
dataSet.print;
});
});
)
… but my intention is to have many many small slices so I rewrote this to clump the slices together and distribute them across the threads and the performance increase is quite impressive, at 11.3 (95%)!
Interestingly if I change from processBlocking
to process
then wait
, it just crashes…
I found that copying the slice out of the large audio buffer didn’t give any benefits in this case.
I have no idea why processBlocking
is always significantly faster than process
then wait
, do you have an example of the opposite?
Also, the CPU usage in htop does not exceed 100%, so this just look like concurrency rather than parallel processing.
Results
Using 1 threads, time taken is: 6.41734852 for 997 slices
Using 12 threads, time taken is: 1.718137513 for 997 slices
Using 1 threads, time taken is: 64.720428981 for 11081 slices
Using 12 threads, time taken is: 5.688826399 for 11081 slices
(
s.waitForBoot({
var num_threads = 12;
var thread = (
\pool : Semaphore(num_threads),
\cond: Condition(),
\startTime: Main.elapsedTime,
\timeStage1: nil,
\loopStartTime: nil
);
// one large concat.ed buffer
var audio = {
var dir = FluidDataSet.class.filenameSymbol;
var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
var return = Buffer.new(s);
s.sync;
bufs.do{
|b|
FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
b.free;
};
return.postln;
return
}.();
var slices = {
var return = Buffer.new(s);
s.sync;
FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.05).wait;
thread.timeStage1 = Main.elapsedTime;
return.postln;
return
}.();
var dataSet = FluidDataSet(s);
var spectral_mean_per_slice_lambda = {
|start, end, index, total_num_slices, this_data_set|
var num_frames = end - start;
var spectral_mean = Buffer.new(s);
var spectral_shape = Buffer.new(s);
var spectral_stats = Buffer.new(s);
var this_slice = Buffer.new(s);
s.sync;
FluidBufSpectralShape.processBlocking(
s,
source: audio,
startFrame:start,
numFrames: num_frames,
features: spectral_shape
);
FluidBufStats.processBlocking(
s,
source: spectral_shape,
stats: spectral_stats,
);
FluidBufSelect.processBlocking(
s,
source: spectral_stats,
destination: spectral_mean,
indices: [0]
);
dataSet.addPoint(index.asString, spectral_mean);
"Finished Processing Slide % / %".format(index, total_num_slices).postln;
spectral_mean.free; spectral_shape.free; spectral_stats.free; this_slice.free;
};
var clump_slices_lambda = {
|slices_array|
var startArr = slices_array.slice((0..(slices_array.size - 2)));
var endArr = slices_array.slice((1..(slices_array.size - 1)));
var indexArr = (0..(slices_array.size - 1));
var eventArray = [startArr, endArr, indexArr].flopWith({
|start, end, index|
(\start_v: start, \end_v: end, \index_v: index, \size_v: end - start)
});
var targetSize = ( (endArr-startArr).reduce('+') / num_threads).floor;
var current_size_v = 0;
// array of clumped slices
eventArray.inject([[]] , {
|arr, ev|
current_size_v = current_size_v + ev.size_v;
current_size_v.postln;
if(current_size_v >= targetSize, {
current_size_v = 0;
arr ++ [[ev]] // append a new array, reseting current clump size
}, {
arr.collect{|a, index| if(index == (arr.size - 1), {a ++ [ev] }, { a })} // append to last inner array
});
});
};
var threaded_function_lambda = {
|clump, numSlices|
clump.do{
|sl|
spectral_mean_per_slice_lambda.(
sl.start_v,
sl.end_v,
sl.index_v,
numSlices
);
};
dataSet.size( action: {
|sz|
if (sz >= (numSlices - 1), {
thread.cond.unhang;
})
});
};
s.sync;
slices.loadToFloatArray(action: {
|slices_array|
var out_of_line_var_1 = thread.loopStartTime = Main.elapsedTime;
var safe_slices_array = if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
slices_array = slices_array.add(audio.numFrames);
});
var clumpedSlices = clump_slices_lambda.(safe_slices_array);
var clumpSlicesSz = clumpedSlices.inject(0, {|count, a| count + a.size});
clumpedSlices.do{
|clump|
{ threaded_function_lambda.(clump, clumpSlicesSz) }.fork(SystemClock);
};
thread.cond.hang;
dataSet.size(action:{
|sz|
dataSet.dump;
dataSet.print;
"Using % threads, time taken is: % for % slices".format(num_threads, Main.elapsedTime - thread.loopStartTime, sz).postln;
});
});
});
)
I haven’t actually done anything with the results, I was going to pop in to a k-d tree, but the debug from the data set looks good.
Anayway… with these sorts of speeds I don’t think there is all that much point in a supernova version, and my problem is solved. Thank you!