Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

parallel library

Posted: 07/17/08 15:39:08

I have been messing around with the idea of a parallel library for a while. Since D doesn't have any way to do this I came up with a way to use the ThreadPool?.

import 	tango.io.Stdout,
	tango.core.ThreadPool,
	tango.time.StopWatch,
	tango.core.Thread;


private ThreadPool!() pool;

static this()
{
	setPool;
}

void setPool(uint size = 12)
{
	pool = new ThreadPool!()(size);
}

//Parallel block
void p_block(void delegate()[] blocks ...)
{
	foreach(blk; blocks)
		pool.assign(blk);
		
	waitThreads;
}

private void waitThreads()
{
	while(pool.activeJobs || pool.pendingJobs)
		Thread.sleep(0);
}

All the magic is in p_block.. You use it as such

p_block(
{
  ..Code A
  //Process Input
},
{
  ..Code B
  //Process Game Sound
},
{
  ..Code C
  //Process AI
});

//Process Physics
//Render

Thus blocks A, B, and C will all run in there own thread and p_block will wait till their done. Of course A, B, and C can't be connected.. and should avoid locking!

My test code..

void main()
{
	int a, b, c, d, e, f, g, h, i, j, k, l;

	Stdout("testing").newline;

	StopWatch w1;
	w1.start;
	
	p_block
	(
		{ for(a = 0; a < 10003201; ++a){} }
		,
		{ for(b = 0; b < 80213002; ++b){} }
		,
		{ for(c = 0; c < 561123423; ++c){} }
		,
		{ for(d = 0; d < 10003201; ++d){} }
		,
		{ for(e = 0; e < 80213002; ++e){} }
		,
		{ for(f = 0; f < 561123423; ++f){} }
		,
		{ for(g = 0; g < 10003201; ++g){} }
		,
		{ for(h = 0; h < 80213002; ++h){} }
		,
		{ for(i = 0; i < 561123423; ++i){} }
		,
		{ for(j = 0; j < 10003201; ++j){} }
		,
		{ for(k = 0; k < 80213002; ++k){} }
		,
		{ for(l = 0; l < 561123423; ++l){} }
	);

	auto t1 = w1.stop;


	Stdout.format("p_block = {} {} {}", a, b, c).newline;

	StopWatch w2;
	w2.start;

	{ for(a = 0; a < 10003201; ++a){} }
	{ for(b = 0; b < 80213002; ++b){} }
	{ for(c = 0; c < 561123423; ++c){} }
	{ for(d = 0; d < 10003201; ++d){} }
	{ for(e = 0; e < 80213002; ++e){} }
	{ for(f = 0; f < 561123423; ++f){} }
	{ for(g = 0; g < 10003201; ++g){} }
	{ for(h = 0; h < 80213002; ++h){} }
	{ for(i = 0; i < 561123423; ++i){} }
	{ for(j = 0; j < 10003201; ++j){} }
	{ for(k = 0; k < 80213002; ++k){} }
	{ for(l = 0; l < 561123423; ++l){} }

	auto t2 = w2.stop;

	Stdout.format("linear = {} {} {}", a, b, c).newline;
	
	Stdout.format("p_block time: {}  Linear time: {}  Delta: {}", t1, t2, t2-t1).newline;
}

/** Output **
testing
p_block = 10003201 80213002 561123423
linear = 10003201 80213002 561123423
p_block time: 4.18  Linear time: 6.87  Delta: 2.69
*/

On my quad core it shows 2.7 seconds faster run time.. Reducing the work for each block reduces the delta.. though I haven't seen it go negative(meaning linear was faster). This seems to be working okay, but wondering if anyone has any ideas to speed this up, look cleaner? Also tried doing a for loop.. much harder... any ideas there?

Author Message

Posted: 08/05/08 00:53:09

Have a decent attempt at a foreach kinda ugly though..

void p_foreach( T, U, Z ) ( Z list, void delegate( T, U ) block, uint poolSize = 12 )
{
	auto pool = new ThreadPool!( T, U )( poolSize );
	foreach( T key, U item; list )
	{
		pool.assign( block, key, item );
	}
	pool.finish;
}

An example usage on a 2D array..

import 	tango.io.Stdout,
	tango.io.Console,
	tango.time.StopWatch,
	tango.math.Math,
        tango.core.ThreadPool;

void main()
{

	Stdout("Building big list").newline;
	int[][] list;
	list.length = 1_000;

	foreach( ref row; list )
		row.length = 100_000;

	StopWatch pT, lT;
	double pTime, lTime;

	Stdout("p_foreach").newline;
		
	pT.start;

	p_foreach!(int, int[], int[][]) (list, 
	delegate( int key, int[] item )
	{
		foreach(k, i; item)
			list[key][k] += 50 / 23 / 4 +3;
	}, 15);

	pTime = pT.stop;

	Stdout("foreach").newline;
	lT.start;
	
	foreach( key, row; list)
		foreach( k, item; row)
			list[key][k] += 50 / 23 / 4 +3;

	lTime = lT.stop;

	Stdout.formatln("P:{}  L:{}  Delta:{}", pTime, lTime, (lTime - pTime));	

	Stdout("Press Enter").newline;
	Cin.get;
}

Output

Building big list
p_foreach
foreach
P:0.26  L:0.89  Delta:0.63
Press Enter

Posted: 08/05/08 02:16:25 -- Modified: 08/05/08 12:45:36 by
od

I've also been messing around with similar attempts from time to time.

See https://ftp.tu-ilmenau.de/index.php?id=be0f0e5257934b31d83cd212e314963b

It covers:

ThreadPool?: You can execute any function/delegate asynchonously with any number/kind of arguments (also ref and out safely) using ThreadPool.exec(...) or ThreadPool.execArgs!(foo)(&foo, foosparam1, foosparam2, ...) So its not bound to any type of parameters and it reuses all ever created threads (of ThreadPool? of course) which don't terminate but wait blocked if out of work. It works entirely through static methods. Its Threads don't exit until the module destructors get called. So any new job will either wake up a waiting thread or create a new one. It also has no limiting size, s.th. no races can occure when running jobs waiting on a condition that later threads would fullfill but don't get executed because of the limit. It also uses lock-free algorithms for maintenance as far as possible.

Parallel foreach: for structs, classes, interfaces which have an opApply and for dynamic and associative arrays. It can be used simply by calling

foreach (...; parallel(...)) {...}

instead of

foreach (...; ...) {...}

However there must already be a bug in the dynamic array part. Another issue is, that it yet can only wrap the very first opApply of structs, interfaces or classes if there are multiple opApplys defined. Planned for the foreach and parallel_for stuff (see below) is also to give more control of parallized slice size or the number of threads to use.

Hidden in the comments in ThreadPool? there is also (not yet tested) support for

parallel_for - semantics akin to TBB/OpenMP

  parallel_for!("int i=0; i<17; i++")({
    //... if you don't need to access i ...
  });

  parallel_for!("int i=17; i<42; i++")(delegate(int i) {
    //... if you want to access int i ...
  });

  parallel_for!("uint j=17; j<42; j++", uint, "j")(delegate(int j) {
    //... if you want to access sth. else than int i ;-) ...
  });

spawn/sync in the semantics of cilk

  spawn({
    foo(); 
    ...
  });
  spawn( bar() ); // lazy
  sync;

with specific sync bariers

  Barrier bar1, bar2;

  spawn({ A }, bar2 );
  spawn( B );

  bar2.sync; // joins A

  spawn( C, bar1);
  spawn({ D });
  spawn({ E }, bar);

  bar1.sync; // joins C and E
  sync;      // joins B and D

Further stuff: Futures and a PooledThread? that shares most semantics of Thread but each time it is start()ed it employes the ThreadPools? to reduce spawning overhead.

Posted: 08/05/08 19:17:42

Difficulty following how yours works Od.. the underlying system seems very complex. I went for simple..

Posted: 08/05/08 19:30:56

Got my for loop working, little cleaner then my foreach..

import 	tango.io.Stdout,
	tango.io.Console,
	Parallel.ThreadPool,
	tango.time.StopWatch,
	tango.math.Math;

void p_for( T ) ( T start, T stop, T delta, void delegate( T ) block , uint poolSize = 12 )
{
	auto pool = new ThreadPool!( T )( poolSize );
	for( T i = start; i != stop; i += delta )
	{
		pool.assign( block, i );
	}
	pool.finish;
}

void p_for( T, V:void  ) ( T start, T stop, T delta, void delegate( T ) block , ThreadPool!( T ) pool )
{
	if( pool is null )
	{
		p_for!( T ) ( start, stop, delta, block );
		return;
	}
	
	for( T i = start; i != stop; i += delta )
	{
		pool.assign( block, i );
	}
	while( pool.activeJobs || pool.pendingJobs )
		Thread.sleep( 0 );
}

void main()
{
	Stdout( "Building list" ).newline;
	int[] list;
	list.length = 500;

	StopWatch pT1, pT2, lT;
	double pTime1, pTime2, lTime;

	Stdout( "p_for pre made pool" ).newline;
	auto pool = new ThreadPool!( int )( 20 );
	
	pT1.start;

	p_for!( int, void )( 0, list.length, 1,
	delegate( int i )
	{
		list[i] = i;
		for( int j = 1; j < 2_345_678; ++j )
			list[i] %= j;			
	}, pool );

	pTime1 = pT1.stop;


	Stdout( "p_for auto made pool" ).newline;
	pT2.start;

	p_for!( int )( 0, list.length, 1,
	delegate( int i )
	{
		list[i] = i;
		for( int j = 1; j < 2_345_678; ++j )
			list[i] %= j;			
	}, 20);

	pTime2 = pT2.stop;

	Stdout.formatln( "PRE {} AUTO {} Diff {}", pTime1, pTime2, abs( pTime1 - pTime2 ) );

	Stdout( list[list.length-1] ).newline;

	Stdout( "for" ).newline;
	lT.start;

	for( int i = 0; i < list.length; ++i )
	{
		list[i] = i;
		for( int j = 1; j < 2_345_678; ++j )
			list[i] %= j;
	}
	
	lTime = lT.stop;

	Stdout(list[list.length-1]).newline;

	Stdout.formatln( "p_for:{}  for:{}  Delta:{}", min( pTime1, pTime2 ), lTime, abs( lTime - min( pTime1, pTime2 ) ) );	

	Stdout( "Press Enter" ).newline;
	Cin.get;
}

Output

Building list
p_for pre made pool
p_for auto made pool
PRE 4.99 AUTO 5.27 Diff 0.28
0
for
0
p_for:4.99  for:7.15  Delta:2.15
Press Enter

On 1/3 of the runs the Auto made ThreadPool? was faster... Use on For loops with complex work. This gives a nice speed up!

now we need a good way to get the processor count..

if(PROCESSORS > 1) {

//Parallel code poolSize = PROCESSORS * 4;

}else {

//Linear Code

}

Posted: 08/05/08 21:37:04

Ok, I'll try to make things more clear.

Given

class Foo
{
  int opApply(int delegate(ref char[]) foreachbody) 
  { 
    foreachbody("one"); 
    foreachbody("two"); 
    foreachbody("three");
  } 
} 

void main()
{
  auto obj = new Foo;
  
  foreach (x; obj) {
    use( x );
  }
}

Instead of the foreach the compiler calls

obj.opApply(int delegate(ref char[] x) { use(x); }); 

What foreach (x; parallel(obj)) { use(x); } does is creating a wrapper around obj. It inspects (the first) Foo.opApply regarding its parameters and mimics these, s.th. the compiler then calls

WrapperAround(obj).opApply(int delegate(ref char[] x) { use(x); }); 

which may look like

// NOTE: I stripped the synchronization stuff
// recall realforeachbody is the int delegate(ref char[] x) { use(x); }
int opApply(int delegate(ref char[]) realforeachbody)
{
  // A delegate of the inner function proxy is passed to Foo.opApply 
  // instead of the original realforeachbody delegate.
  // It then gets called by Foo.opApply each time with "one", "two", "three".
  int proxy(ref char[] _arg1) 
  {
    // This calls realforeachbody (-> { use(x); }) with _arg1 (one of "one", "two", "three")
    ThreadPool.execArgs!(int delegate(ref char[]))( realforeachbody, _arg1 );
  }

  obj.opApply( &proxy );
  // now obj.opApply calles/called proxy("one"); proxy("two"); proxy("three"); 
  // While proxy("one") places a call to realforeachbody("one") on the ThreadPool and so on.

  // now wait until all of the spawned threads are finished. Again, I stripped that here. 
}

So the controlflow looks like that: The foreach(x; parallel(obj)) { use(x); } becomes

WrapperAround(obj).opApply( int delegate(ref char[] x){ use(x); } );
  calls obj.opApply( &proxy );
     calls proxy("one"); --> places { use(x); } with x="one" on the ThreadPool
     calls proxy("two"); --> places { use(x); } with x="two" on the ThreadPool
     calls proxy("three"); --> places { use(x); } with x="three" on the ThreadPool
  wait until the all calls to { use(x); } on the ThreadPool got finished.

The stuff with calling functions/delegates with arguments on the ThreadPool? in a safe way is a bit more tricky. It involves a simple parser at compiletime CTFE_InspectParamsOf which generates a struct with some constants that can easily be used with mixins to mimic a given function regarding its parameters.

Based on that a "CallBox?" struct is generated which serves as a container where the arguments of a function/delegate can be stuffed in. If the arguments contain ref or out values, a pointer is used instead of the value. So imagine

  int foo(ref float, bool, out char[], long) { 
    ... 
  } 

  float x;
  char[] y;

  CallBox!(foo) callbox;
  // this generates a
  // struct 
  // {
  //   Tuple!(float*, bool, char[]*, long) tup; 
  //  
  //   void put(ref float _arg0, bool _arg1, out char[] _arg2, long _arg3) 
  //   {
  //     tup[0] = &_arg0; 
  //     tup[1] =  _arg1; 
  //     tup[2] = &_arg2; 
  //     tup[3] =  _arg3;
  //   }
  //
  //   void call(int delegate(ref float, bool, out char[], long) dg)
  //   {
  //     dg( *tup[0], tup[1], *tup[2], tup[3] );
  //   }
  // }

  callbox.put(x, true, y, 42); 
  // Now the parameters values and references are saved in the callbox. 

  // Now we may call foo with the previously boxed arguments.
  callbox.call(&foo);

Now imagine we can put the callbox and a delegate to foo into some threadlocal storage (tls) and then somehow make the thread call tls.callbox.call(tls.delegatetofoo) . Then that is pretty much what

  ThreadPool.execArgs!(foo)(&foo, x, true, y, 42); 

does - despite some details again.

This also made it possible to make some kind of /universal/ ThreadPool? for all kinds of tasks/delegates/functions with any number/kind of arguments, opposed to the present tango.core.ThreadPool? which has to be fixed to an argumentstructure that does not support ref and out and makes no reuse of /all/ threads over the complete runtime of the program.

Based on that ThreadPool? and a Lightswitch for Semaphores I could also do the Future, parallel_for, spawn/sync stuff - but again these are not yet even tested ;-)