Consumer/Producer Race Condition?

Mar 22, 2011 at 3:38 PM

Our application is a modified Consumer/Producer design pattern in that there are several "consumers" that need to be executed concurrently. Which is why the STP works so well because I am able to put all the concurrent work items into their own Work Item Group.

I am having very intermittent and odd problems happening which in my experience generally means I'm in a race condition, but I cannot seem to track down the exact location of the problem.

I am using hashtables to store the WIG, the producers will add new requests to the workitemgroup. Below is the producer code:

I'm not sure how to lock the Work Item Groups while I enqueue. Is it already threadsafe? Is there a built in method that I can use to keep track of my WIGs?

Hashtable consumers;

public void producer

  do
                {
                    IWorkItemsGroup WIG;
                    if (sched["inetaddr"] != null)
                    {
                        if (consumers.ContainsKey(sched["inetaddr"])
                        {
                                WIG = (IWorkItemsGroup)consumers[sched["inetaddr"]];
                        }
                        else
                        {
                            WIG = smartThreadPool.CreateWorkItemsGroup(1);
                            WIG.Name = sched["inetaddr"];

                             consumers[sched["inetaddr"]] = WIG;
                        }

                        SmartConsumerStruct CS = new SmartConsumerStruct();
                        CS.ip = sched["inetaddr"];
                        CS.ScheduleID = sched.ScheduleID;

                        WIG.QueueWorkItem(new WorkItemInfo() { Timeout = 500 * 1000 }, this.DoSomeWork, CS);

                    }

                } while (sched.MoveNext());

}

Coordinator
Mar 22, 2011 at 9:02 PM

Hi,

The WIG itself is thread safe, you need to synchronize the access to the consumers.

Ami

Mar 22, 2011 at 9:07 PM

This is where I'm a little confused.  The consumers hash is just storing a reference to the WIG.  The consumer thread does not have access to the consumer hash.

Will locking the consumer hash lock the WIG in the consumer thread?

public void producer

{ 

  do
                {
                    IWorkItemsGroup WIG;
                    if (sched["inetaddr"] != null)
                    {
			lock(consumers)
		     {
                        if (consumers.ContainsKey(sched["inetaddr"])
                        {
                                WIG = (IWorkItemsGroup)consumers[sched["inetaddr"]];
                        }
                        else
                        {
                            WIG = smartThreadPool.CreateWorkItemsGroup(1);
                            WIG.Name = sched["inetaddr"];

                             consumers[sched["inetaddr"]] = WIG;
                        }

                        SmartConsumerStruct CS = new SmartConsumerStruct();
                        CS.ip = sched["inetaddr"];
                        CS.ScheduleID = sched.ScheduleID;

                        WIG.QueueWorkItem(new WorkItemInfo() { Timeout = 500 * 1000 }, this.DoSomeWork, CS);

		}
}
 } while (sched.MoveNext()); }
Coordinator
Mar 22, 2011 at 9:22 PM

The answer is no.

You code should look like this:

public void producer()
{
    dynamic smartThreadPool = null;
    do
    {
        IWorkItemsGroup WIG;
        if (sched["inetaddr"] != null)
        {

// Try to get a WIG to avoid locking
bool exists = consumers.TryGetValue(sched["inetaddr"], out WIG); if (!exists) { // If it doesn't exist then lock
lock (consumers) {
// Check if someone else was faster and added the WIG
if (consumers.ContainsKey(sched["inetaddr"])) { WIG = (IWorkItemsGroup)consumers[sched["inetaddr"]]; } else { // Create a new WIG and add it
WIG = smartThreadPool.CreateWorkItemsGroup(1); WIG.Name = sched["inetaddr"]; consumers[sched["inetaddr"]] = WIG; } } } // Use the WIG
SmartConsumerStruct CS = new SmartConsumerStruct(); CS.ip = sched["inetaddr"]; CS.ScheduleID = sched.ScheduleID; WIG.QueueWorkItem(new WorkItemInfo() { Timeout = 500 * 1000 }, this.DoSomeWork, CS); } } while (sched.MoveNext()); }
Mar 22, 2011 at 9:24 PM

I greatly appreciate your time and help with this, I will give it a try.

Thank You