Supernova/Supercollider - unable to find cmd plugin error

Hi,
does anybody know if FluCoMa works with supernova? Normally, I don’t use supernova as I rarely do anything demanding enough to see a performance increase so I might have the installation wrong.

Server.scsynth;
s.boot;
FluidDataSet(s); // works, so does everything else I have tested
Server.supernova;
s.boot;
FluidDataSet(s); // does not work, and I get the same error for all other classes
//error:  unable to find cmd plugin: FluidDataSet/new

Ultimately I wanted to compute descriptors for a large buffer in chunks, and pop them into a data set.

Specs:
Ryzen 5 (5600X), Linux Manjaro, Supercollider built from source (with system boost) same for FluCoMa, both main branch.

Hey @Jordan and welcome.

I’m not the SuperCollider buff here but I can probably help to a degree. There is currently no support for supernova, but I was talking to @francesco.cameli about this sort of thing as I was recently in the process of coding several examples of batch processing in SuperCollider myself. I also can’t speak to what work has been done in the past and whether or not any speed improvements were found using a test compile of supernova; it might not be worth the time figuring out compiling it for the speed increases.

So my interim solution would be that in the meantime you experiment with how fast you can get it on scsynth and then report back if you’re hitting a wall with the processing?

I’ve attached two scripts that both deal with different kinds of batch processing (decomposition and analysis). You may be more interested to start with the first script (batch-slicing-threads) which analyses segments of a sound file with audio descriptors and puts the result into a dataset. The processing loop spawns several threads and controls the number with a Semaphore to stop the server from being sad. With a CPU like yours, you could try cranking the number of concurrent jobs up pretty high without things derailing.

batch-slicing-threads.scd (5.8 KB)
batch-decomposition-threads.scd (3.4 KB)

Let me know how that goes and if I can help you in any other ways!

1 Like

Hi James,
honestly I didn’t know supercollider could do this, literally never had a need to before!

Also thanks for the quick reply, had a problem getting FluCoMa to work earlier this week and asked on github, and got a super fast response there too!

From the decomposition example you sent I tweaked it to get some comparable output and I get:

Using 1 threads, time taken is: 6.70731677
  ...
Using 12 threads, time taken is: 1.433796945, for a comparative boost of 4.6780102254995

Not a bad result given there is a much of buffers being created and freed during the process! In htop, scsynth’s cpu usage briefly spikes at 1000% (for the 12 threads). I didn’t know that scsynth could use more than one thread so that’s great! However…

… I was asking as I was looking at the examples (copy at the bottom) from FluidProcessSlices which calls .play and takes a tasks-num argument, but this only uses 1 thread regardless.

Does anybody know why this is? Is it because of the `.kr’ method? If so should the documentation be changed to a buffer-based process for FluidProcessSlices as I can’t see much point in it otherwise (unless I am missing a use case?).

s.boot;
s.quit
//Load all the Fluid Corpus Manipulation audio files

(
~path = File.realpath(FluidLoadFolder.class.filenameSymbol).dirname +/+ "../AudioFiles";
~loader = FluidLoadFolder(~path);
~loader.play(s,action:{ |dataDictionary| "Done loading".postln});
~slicer = FluidSliceCorpus({ |src,start,num,dest|
        FluidBufOnsetSlice.kr(src,start,num,indices:dest, threshold:2)
});
~pitchdata = FluidDataSet(s);
~pitchbufs = 4.collect{Buffer.new};
~statsbufs = 4.collect{Buffer.new};
)

//segment
~slicer.play(s,~loader.buffer,~loader.index,{|dataDictionary| "Slicing done".postln;});

//In the interests of brevity, let's just take a subset of the slices and process these
~subset = IdentityDictionary.newFrom(~slicer.index.asSortedArray[0..7].flatten(1));

//write pitch statistics into a dataset
//definte the extraction function...
(
~extractor = FluidProcessSlices({|src,start,num,data|
    var pitch, stats, identifier,i;
    i = data.value[\voice];
    identifier = data.key;
    pitch = FluidBufPitch.kr(src,start,num,features:~pitchbufs[i]);
    stats = FluidBufStats.kr(~pitchbufs[i],stats:~statsbufs[i],trig:Done.kr(pitch));
    FluidDataSetWr.kr(~pitchdata, identifier, nil, buf:~statsbufs[i], trig:Done.kr(stats))
});
)

//... and run it
~extractor.play(s, 
	sourceBuffer: ~loader.buffer, 
	bufIdx: ~slicer.index, 
	action: {"Feature extraction done".postln},
	tasks: 12
);

//view the data
~pitchdata.print

Okay so I’m confused again…

whilst the batch-decomposition-threads.scd example worked great and will run on many cores (htop sees scsynth’s CPU usage at 1000+% with 13 threads), I can’t modify the example to do what I want.

I’ve put my code below, but I’m trying to:

  1. concat a bunch of buffers together,
  2. then slice them,
  3. and finally add the mean of the centroid-per-slice into a data set.

I thought step 3 would be done in parallel given the code below, but it only runs on one core (CPU usage hits 80% with 1 threads, and 100% with 13). My hypothesis is that it thinks the audio variable is mutated so the whole function blocks. I tried commenting out the use of the dataset in the loop, but that didn’t change anything, and besides, the batch-decomposition example added to a Dictionary.

Rather stumped on how to proceeded…

(
s.waitForBoot({
	var num_threads = 13;
	// one large concat.ed buffer
	var audio = {
		var dir = FluidDataSet.class.filenameSymbol;
		var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
		var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
		var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
		var return = Buffer.new(s);
		s.sync;
		bufs.do{
			|b|
			FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
			b.free;
		};
		return
	}.();

	var slices = {
		var return = Buffer.new(s);
		s.sync;
		FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.5).wait;
		return
	}.();

	var thread = (
		\pool : Semaphore(num_threads),
		\cond: Condition(),
		\startTime: Main.elapsedTime,
	);

	var dataSet = FluidDataSet(s);

	var calc_spectral_mean_per_slice = {
		|start, end, index, total_num_slices|
		var out_of_order_var_request_1 = thread.pool.wait;
		var num_frames = end - start;
		var spectral_mean = Buffer.new(s);
		var spectral_shape = Buffer.new(s);
		var spectral_stats = Buffer.new(s);
		s.sync;
		FluidBufSpectralShape.process(
			s,
			source: audio,
			startFrame: start,
			numFrames: num_frames,
			features: spectral_shape
		).wait;
		FluidBufStats.process(
			s,
			source: spectral_shape,
			stats: spectral_stats,
		).wait;
		FluidBufSelect.process(
			s,
			source: spectral_stats,
			destination: spectral_mean,
			indices: [0]
		).wait;

		dataSet.addPoint(index.asString, spectral_mean);
		dataSet.size( action: {
			|size|
			if (size == (total_num_slices - 1), {
				thread.cond.unhang
			})
		});
		"Finished Processing Slide % / %".format(index, total_num_slices).postln;
		spectral_shape.free; spectral_stats.free; spectral_mean.free;
		thread.pool.signal;
	};

	s.sync;

	
	slices.loadToFloatArray(action: {
		|slices_array|
		if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
			slices_array = slices_array.add(audio.numFrames); 
		});
		slices_array.doAdjacentPairs{
			|start, end, index|
// here's the fork
			{calc_spectral_mean_per_slice.(start, end, index, slices_array.size())}.fork(SystemClock)
		};
		thread.cond.hang;
		"Using % threads, time taken is: %".format(num_threads, Main.elapsedTime - thread.startTime).postln;
		dataSet.print;
	});
	
});
)

I’m not currently at my station but maybe @tedmoore or @weefuzzy can help you in the interim?

I’m not at my mental peak :laughing: But this is where we start to run into the limitations of our implementation, plus the fact that reasoning about threads is just a bit gnarly.

  • Limitations of our stuff: ,doing a whole bunch of little threaded jobs on slices of a big ass source buffer is a pathological case for FluCoMa just at the moment. Basically, the bits of code that deal with launching the asynchronous work (and copying data from the source buffer for safety) don’t know that they only need a teeny little slice of the source, so a lot of time gets wasted copying data that doesn’t need copying. This will be fixed in a later version, but needs a little bit of redesign.
  • Also, launching lots of new threads has a certain amount of overhead: having a pool on the server would be better, but that’s a harder problem to solve.
  • Which means that, lots and lots of tiny jobs in threads can end up being slower than just running them synchronously because the overhead of scheduling etc. starts to swamp the actual work being done.

Your code is definitely launching threads on my machine when I look in the profiler. Lots of 'em! But very very short-lived.

With your code, if I get rid of threading entirely by

  1. Changing all the process calls to processBlocking (and getting rid of wait as it’s now redundant)
  2. Getting rid of the fork (so the semaphore and condition also become redundant, but I left them)

then I get a speed up from ~44s to ~2s

Alternatively, I can improve the threaded case by using FluidBufCompose#processBlocking to just copy out the slice before FluidSpectralShape, and things go to ~7s. (So the difference is thread scheduling and copying overhead).

So I added / changed this in calc_spectral_mean_per_slice

      var slice = Buffer.new(s); 
        
        FluidBufCompose.processBlocking(
            s,
            source:audio,
            startFrame:start, 
            numFrames: num_frames,
            destination: slice
        ); 
        
		FluidBufSpectralShape.process(
			s,
			source: slice,
			features: spectral_shape
		).wait;

The take home is that processing small slices is often faster done synchronously, unless you need to do lots of it and there’s an obvious way to parallelise it without too much thread management overhead. Processes like taking stats over a few samples are so relatively cheap, that it’s very often not worth doing in a thread, compared to the time cost of locks, context switches and the rest of it.

1 Like

Hi Owen, James,
thank you both for your help!

So altering the original code I was able to almost get the same as Owen,

//forked
Using 1 threads, time taken is: 35.332030118 - all process and wait
Using 12 threads, time taken is: 2.992354422 - all process and wait

Using 1 threads, time taken is: 6.481287311 - processBlocking
Using 12 threads, time taken is: 1.43672477 - processBlocking

//unforked
Time taken is: 6.42059408  - processBlocking
Time taken is: 35.808163976 - all process and wait

And here is that code.

(
s.waitForBoot({
	var num_threads = 1;
	
	var thread = (
		\pool : Semaphore(num_threads),
		\cond: Condition(),
		\startTime: Main.elapsedTime,
		\timeStage1: nil,
		\loopStartTime: nil
	);
	
	// one large concat.ed buffer
	var audio = {
		var dir = FluidDataSet.class.filenameSymbol;
		var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
		var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
		var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
		var return = Buffer.new(s);
		s.sync;
		bufs.do{
			|b|
			FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
			b.free;
		};
		return.postln;
		return
	}.();
	
	var slices = {
		var return = Buffer.new(s);
		s.sync;
		FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.5).wait;
		thread.timeStage1 = Main.elapsedTime;
		return.postln;
		return
	}.();
	
	var dataSet = FluidDataSet(s);
	
	var calc_spectral_mean_per_slice = {
		|start, end, index, total_num_slices|
		var out_of_order_var_request_1 = thread.pool.wait;
		var num_frames = end - start;
		var spectral_mean = Buffer.new(s);
		var spectral_shape = Buffer.new(s);
		var spectral_stats = Buffer.new(s);
		var this_slice = Buffer.new(s);
		s.sync;
		FluidBufCompose.process(
			s,
			source:audio,
			startFrame:start,
			numFrames: num_frames,
			destination: this_slice
		).wait;
		FluidBufSpectralShape.process(
			s,
			source: this_slice,
			features: spectral_shape
		).wait;
		FluidBufStats.process(
			s,
			source: spectral_shape,
			stats: spectral_stats,
		).wait;
		FluidBufSelect.process(
			s,
			source: spectral_stats,
			destination: spectral_mean,
			indices: [0]
		).wait;
		
		dataSet.addPoint(index.asString, spectral_mean);
		dataSet.size( action: {
			|size|
			if (size == (total_num_slices - 1), {
				thread.cond.unhang;
			})
		});
		if((index % 50) == 0,{
			"Finished Processing Slide % / %".format(index, total_num_slices).postln;
		});
		spectral_shape.free; spectral_stats.free; spectral_mean.free;this_slice.free;
		thread.pool.signal;
	};
	
	s.sync;
	
	slices.loadToFloatArray(action: {
		|slices_array|
		thread.loopStartTime = Main.elapsedTime;
		if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
			slices_array = slices_array.add(audio.numFrames);
		});
		slices_array.doAdjacentPairs{
			|start, end, index|
			{
				calc_spectral_mean_per_slice.(start, end, index, slices_array.size())
			}.fork(SystemClock)
		};
		thread.cond.hang;
		"Using % threads, time taken is: %".format(num_threads, Main.elapsedTime - thread.loopStartTime).postln;
		dataSet.print;
	});
	
});
)

… but my intention is to have many many small slices so I rewrote this to clump the slices together and distribute them across the threads and the performance increase is quite impressive, at 11.3 (95%)!

Interestingly if I change from processBlocking to process then wait, it just crashes…

I found that copying the slice out of the large audio buffer didn’t give any benefits in this case.

I have no idea why processBlocking is always significantly faster than process then wait, do you have an example of the opposite?

Also, the CPU usage in htop does not exceed 100%, so this just look like concurrency rather than parallel processing.

Results


Using 1 threads, time taken is: 6.41734852 for 997 slices
Using 12 threads, time taken is: 1.718137513 for 997 slices

Using 1 threads, time taken is: 64.720428981 for 11081 slices
Using 12 threads, time taken is: 5.688826399 for 11081 slices

(
s.waitForBoot({
	var num_threads = 12;

	var thread = (
		\pool : Semaphore(num_threads),
		\cond: Condition(),
		\startTime: Main.elapsedTime,
		\timeStage1: nil,
		\loopStartTime: nil
	);

	// one large concat.ed buffer
	var audio = {
		var dir = FluidDataSet.class.filenameSymbol;
		var filesToProcess = PathName(File.realpath(dir).dirname.withTrailingSlash ++ "../AudioFiles/");
		var wavs = filesToProcess.files.select({ |file| file.extension == "wav" });
		var bufs = wavs.collect{ |w| Buffer.readChannel(s, w.fullPath.asString, channels:[0]) };
		var return = Buffer.new(s);
		s.sync;
		bufs.do{
			|b|
			FluidBufCompose.process(s, source: b, destination: return, destStartFrame: return.numFrames).wait;
			b.free;
		};
		return.postln;
		return
	}.();

	var slices = {
		var return = Buffer.new(s);
		s.sync;
		FluidBufNoveltySlice.process(s, audio, indices: return, threshold:0.05).wait;
		thread.timeStage1 = Main.elapsedTime;
		return.postln;
		return
	}.();

	var dataSet = FluidDataSet(s);


	var spectral_mean_per_slice_lambda = {
		|start, end, index, total_num_slices, this_data_set|
		var num_frames = end - start;
		var spectral_mean = Buffer.new(s);
		var spectral_shape = Buffer.new(s);
		var spectral_stats = Buffer.new(s);
		var this_slice = Buffer.new(s);
		s.sync;

		FluidBufSpectralShape.processBlocking(
			s,
			source: audio,
			startFrame:start,
			numFrames: num_frames,
			features: spectral_shape
		);
		FluidBufStats.processBlocking(
			s,
			source: spectral_shape,
			stats: spectral_stats,
		);
		FluidBufSelect.processBlocking(
			s,
			source: spectral_stats,
			destination: spectral_mean,
			indices: [0]
		);

		dataSet.addPoint(index.asString, spectral_mean);

		"Finished Processing Slide % / %".format(index, total_num_slices).postln;
		spectral_mean.free; spectral_shape.free; spectral_stats.free; this_slice.free;
	};

	var clump_slices_lambda = {
		|slices_array|
		var startArr = slices_array.slice((0..(slices_array.size - 2)));
		var endArr = slices_array.slice((1..(slices_array.size - 1)));
		var indexArr = (0..(slices_array.size - 1));
		var eventArray = [startArr, endArr, indexArr].flopWith({
			|start, end, index|
			(\start_v: start,  \end_v: end, \index_v: index,  \size_v: end - start)
		});
		var targetSize = ( (endArr-startArr).reduce('+') / num_threads).floor;
		var current_size_v = 0;
		// array of clumped slices
		eventArray.inject([[]] , {
			|arr, ev|
			current_size_v = current_size_v + ev.size_v;
			current_size_v.postln;
			if(current_size_v >= targetSize, {
				current_size_v = 0;
				arr ++ [[ev]] // append a new array, reseting current clump size
			}, {
				arr.collect{|a, index| if(index == (arr.size - 1), {a ++ [ev] }, { a })} // append to last inner array
			});
		});
	};

	var threaded_function_lambda = {
		|clump, numSlices|
		clump.do{
			|sl|
			spectral_mean_per_slice_lambda.(
				sl.start_v,
				sl.end_v,
				sl.index_v,
				numSlices
			);
		};
		dataSet.size( action: {
			|sz|
			if (sz >= (numSlices - 1), {
				thread.cond.unhang;
			})
		});
	};

	s.sync;

	slices.loadToFloatArray(action: {
		|slices_array|
		var out_of_line_var_1 = thread.loopStartTime = Main.elapsedTime;
		var safe_slices_array = if (slices_array.at(slices_array.size - 1) != audio.numFrames, {
			slices_array = slices_array.add(audio.numFrames);
		});

		var clumpedSlices = clump_slices_lambda.(safe_slices_array);

		var clumpSlicesSz = clumpedSlices.inject(0, {|count, a| count + a.size});

		clumpedSlices.do{
			|clump|
			{ threaded_function_lambda.(clump, clumpSlicesSz) }.fork(SystemClock);
		};

		thread.cond.hang;

		dataSet.size(action:{
			|sz|
			dataSet.dump;
			dataSet.print;
			"Using % threads, time taken is: % for % slices".format(num_threads, Main.elapsedTime - thread.loopStartTime, sz).postln;
		});
	});
});
)

I haven’t actually done anything with the results, I was going to pop in to a k-d tree, but the debug from the data set looks good.

Anayway… with these sorts of speeds I don’t think there is all that much point in a supernova version, and my problem is solved. Thank you!

1 Like

Nice one. Glad you’re making such significant headway the tools so fast!

I have found this to the true as well. I used to use a lot of process and wait because it feels more naturally SuperCollider to me to handle this kind of stuff in the language, but I’ve completely switched over to processBlocking now!

If batch processes get very large, it is also possible to spawn a few servers and disperse tasks across them (Supernova-y without using Supernova), which works quite well. Note you’ll have to create a different FluidDataSet for each server and concatenate them later.

Because there’s a (relatively) fixed amount of overhead involved in making and launching threads, plus a O(n) overhead due to copying input data, which for relatively short processing tasks will become more consequent (i.e. it will take noticeably longer).

The option for threaded processing was added to stop the scsynth command thread getting gummed up by algorithms that take longer – either processing lots of data in one go, or because the algorithm is quite heavy (e.g. FluidBufNMF). If you run NMF with processBlocking on a sound of even moderate length, you’ll see the IDE status bar go yellow because no status message has been sent from the server for some time.

Where threads are more likely to save you some time is if you wanted to run a bunch of longish tasks all at once. In that case, it’s more likely that they might end benefiting from multiple cores. For lots and lots of small jobs, it’s likely always to be slower: even if the small jobs do end up on separate cores (of which there is no guarantee), they are so short lived that, as I say, low-level interaction with the OS thread scheduler and whatever the hardware itself does is going to be noticeable.

This is related to your CPU load query above too: churning through the 1000 or so tiny jobs, I can see all the threads appear in my scheduler, but they’re so short lived that the chances of multiple cores being fully engaged for any significant amount of time is pretty low.

1 Like

Yup I just had a look at the older example and saw how many real threads it was making! Ha! The thing is, I’m quite familiar with using threads, I do quite a bit of c++ programming, currently making a Qt quick program, but the supercollider docs all say that the Thread object isn’t a real thread, so I just assumed it would use a pool.

Does process then wait work like a promise/future then?

Even though in the last example I was able to get an 11x increase by using 12 ‘threads’, this wasn’t multicore processing, just better concurrency. Scsynth never spawned any more threads. The only example where I saw multicore processing was the decomposition one.

1 Like

Yeah, the threads we manage in scsynth are completely independent of the not-real threads in the language. Using fork and a Semaphore in the language is just there to give a way of keeping a limit on how many real threads we fire off on the server – too many, and performance will degrade system-wide quite quickly :grimacing: (another reason having a real pool server-side would be good).

Does process then wait work like a promise/future then?

.wait doesn’t really fulfil all the criteria of a proper future-promise pair, but it has some of the foundations of it. All it’s doing is using a Condition to wait for the server to report that a job is finished so that one can avoid having too many nested action callbacks if desired.

Proper future-promises in the language might be interesting though. Or perhaps continuations rather than callbacks.

The only example where I saw multicore processing was the decomposition one.

It makes some sense to me that you might only see more obvious use of multiple cores in the decomposition example because the parcels of work there are bigger and heavier. The disposition of where work actually gets done core-wise is, I guess, mostly down to the OS thread scheduler, which is probably doing some moderately clever stuff to try and figure out what’s going on and spread the load accordingly. With lots of tweaky little jobs, I’d suppose that there isn’t much opportunity for clever load balancing. Which is probably yet another reason why a pool on the server would be good :laughing:

Interestingly if I change from processBlocking to process then wait, it just crashes…

Forgot to say above: if you can tell me what you changed to make it crash, I’d be grateful (so I can fix it!)

I wondered where your fluency was coming :slight_smile: I really enjoy this thread (pun intended)as I think it will demystify loads of assumptions people (including me) might have about threading. Despite @weefuzzy teaching me many times (he is patient) and my own experience, I keep having the hunch that many parallel jobs should be better… intuitions here can be misleading!

Now as explained above at the moment our “small sections of a large buffer” threading memory management is not ideal either, but as the bufcompose example shows it will still be slower in this case.