Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

ThreadGroup.joinEach()

Moderators: kris

Posted: 02/13/08 22:23:52

Hi,

I'd like to toss in an idea for a slight improvement that I had while working with ThreadGroup?. I'd propose a joinEach() method in ThreadGroup?, so you can "collect" each working thread one after another right when they finished, not after all finished. So the basic idea is just to also use the main/joining thread for processing the results of the workerthreads instead of just waiting for the last one to get finished. The following example actually doesn't really need a ThreadGroup? for the work but I first implemented something similar while expecting that ThreadGroup? could be prettily used for this purpose:

module threadworker;

import
	tango.core.Thread,
	tango.math.Random,
	tango.group.log,
	tango.group.text;
	

static this()
{
	auto rootLog = Log.getRootLogger();
	rootLog.setLevel( Logger.Level.Trace );
	rootLog.addAppender( new ConsoleAppender() );
}


alias char[] Job;
alias char[] Result;

class WorkerThread : Thread
{
	private static Logger log;
	
	static this()
	{
		log = Log.getLogger( "Worker" );
	}
	
	public Job job;
	public Result result;
	public bool jobdone;
	
	this( Job job )
	{
		log.info( "Thread for "~job~" created. Waiting to start ..." );
		this.job = job;
		this.jobdone = false;
		super ( &run );
	}
	
	private void run()
	{
		auto time = cast(double)(Random.shared.next%5000 );
		
		log.info( "Thread "~this.name~" started. Now working: "~job~" ... assuming to need at least "~Integer.toString(cast(long)time/1000)~"sec for this" );		
		
		sleep( time/1000 );		
		result = "Done "~job;
		jobdone = true;
		
		log.info( "Thread "~this.name~" finished: "~job );
	}
}
	
int main(char[][] args) 
{
	auto log = Log.getLogger( "Supervisor" );
	log.info( "Acquiring Workers ..." );

	Job[] jobs = ["Wash dishes", "Clean the bath", "Gain world domination", "Buy some beer", "Buy some more beer", "Read some paper", "Learn D", "Learn to Tango with D"];
	scope threads = new ThreadGroup();
	
	foreach( job; jobs ) {

		static threadid = 0;

		log.info( "Creating WorkerThread for: "~job );
		
		auto worker = new WorkerThread(job);
		worker.name = "#"~Integer.toString(threadid++);

		threads.add( worker );
		worker.start();

	}

	for (;;) {

		bool threads_allbusy = true;
		uint threads_running = 0;

		foreach (thread; threads) {

			WorkerThread worker = cast(WorkerThread)thread;
			assert ( worker !is null );

			if ( thread.isRunning() ) {

				threads_running++;
				if (worker.jobdone) {
					log.warn( "Thread "~thread.name~" is running but worker_finished flag is set! What the heck is he doing there?" );
				}

			}
			else {

				threads_allbusy = false;
				if (!worker.jobdone) {
					log.error( "Thread "~thread.name~" is not running anymore but finished flag is not set! Where is he gone?" );
				}
				
				threads.remove( thread );

				log.info( "Thread "~thread.name~" worked out following results: "~worker.result );

			}

		}

		if (threads_running==0) {
			break;
		}

		if (threads_allbusy) {
			log.info( "None of the WorkerThreads finished this cycle... yielding" );
			// Thread.getThis().yield(); 
			Thread.getThis().sleep( 1 );
		}
		
	}
	
	threads.joinAll(); // anyways there should be no more remaining.

	log.info( "All Workers finished doing their jobs! Going home now ..." );
	
	return 0;
}

Which btw prettily yielded:

Info  Supervisor - Acquiring Workers ...
Info  Supervisor - Creating WorkerThread for: Wash dishes
Info  Worker - Thread for Wash dishes created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Clean the bath
Info  Worker - Thread for Clean the bath created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Gain world domination
Info  Worker - Thread for Gain world domination created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Buy some beer
Info  Worker - Thread for Buy some beer created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Buy some more beer
Info  Worker - Thread for Buy some more beer created. Waiting to start ...
Info  Worker - Thread #0 started. Now working: Wash dishes ... assuming to need at least 4sec for this
Info  Worker - Thread #1 started. Now working: Clean the bath ... assuming to need at least 1sec for this
Info  Worker - Thread #2 started. Now working: Gain world domination ... assuming to need at least 2sec for this
Info  Supervisor - Creating WorkerThread for: Read some paper
Info  Worker - Thread for Read some paper created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Learn D
Info  Worker - Thread for Learn D created. Waiting to start ...
Info  Supervisor - Creating WorkerThread for: Learn to Tango with D
Info  Worker - Thread for Learn to Tango with D created. Waiting to start ...
Info  Supervisor - None of the WorkerThreads finished this cycle... yielding
Info  Worker - Thread #3 started. Now working: Buy some beer ... assuming to need at least 0sec for this
Info  Worker - Thread #4 started. Now working: Buy some more beer ... assuming to need at least 1sec for this
Info  Worker - Thread #5 started. Now working: Read some paper ... assuming to need at least 4sec for this
Info  Worker - Thread #6 started. Now working: Learn D ... assuming to need at least 0sec for this
Info  Worker - Thread #7 started. Now working: Learn to Tango with D ... assuming to need at least 1sec for this
Info  Worker - Thread #3 finished: Buy some beer
Info  Worker - Thread #6 finished: Learn D
Info  Supervisor - Thread #3 worked out following results: Done Buy some beer
Info  Supervisor - Thread #6 worked out following results: Done Learn D
Info  Supervisor - None of the WorkerThreads finished this cycle... yielding
Info  Worker - Thread #7 finished: Learn to Tango with D
Info  Worker - Thread #1 finished: Clean the bath
Info  Worker - Thread #4 finished: Buy some more beer
Info  Supervisor - Thread #1 worked out following results: Done Clean the bath
Info  Supervisor - Thread #4 worked out following results: Done Buy some more beer
Info  Supervisor - Thread #7 worked out following results: Done Learn to Tango with D
Info  Supervisor - None of the WorkerThreads finished this cycle... yielding
Info  Worker - Thread #2 finished: Gain world domination
Info  Supervisor - Thread #2 worked out following results: Done Gain world domination
Info  Supervisor - None of the WorkerThreads finished this cycle... yielding
Info  Supervisor - None of the WorkerThreads finished this cycle... yielding
Info  Worker - Thread #0 finished: Wash dishes
Info  Worker - Thread #5 finished: Read some paper
Info  Supervisor - Thread #5 worked out following results: Done Read some paper
Info  Supervisor - Thread #0 worked out following results: Done Wash dishes
Info  Supervisor - All Workers finished doing their jobs! Going home now ...

With a nifty ThreadGroup?.joinEach() the supervising/collecting work (last for loop within main) could be broken down to a simple:

	foreach (thread; threads.joinEach()) {
	
		WorkerThread worker = cast(WorkerThread)thread;
		assert ( worker !is null );
		
		if (!worker.jobdone) {
			log.error( "Thread "~thread.name~" is not running anymore but finished flag is not set! Where is he gone?" );
		}
			
		log.info( "Thread "~thread.name~" worked out following results: "~worker.result );
			
	}

As I mentioned before I found me implementing something similar to the above example using ThreadGroup? because I had the expectation in mind that ThreadGroup? was capable of doing something like that ;-)

BTW: I'd also suggest to extend the isRunning() and isDaemon() description in the docs to also reflect what could be expected from these states. e.g. When isRunning is true: Also before the Thread got start()ed or when it is not scheduled right now (suspended/blocked/waiting)? And is isDaemon just the state after run() exited or maybe also in other cases?

Kind regards, Oliver

There are no responses to display.