关于生产者和消费者问题,为什么主函数里只运行一次
其中测试文件ThreadInfo.txt为:3
1 p 3
2 p 4
3 C 4 1
4 P 2
5 C 3 1 2 4
每行这件有TAB键隔开
#include "stdafx.h"
const int MaxProductNum=100;//一个线程允许消费的最大产品数
struct ThreadInfo{
//线程基本信息
int ThreadNumber;//记录该线程的编号
char ThreadType;//记录该线程时生产者还是消费者
int ThreadSleepTime;//记录该线程的休眠时间
//Produce信息
int ProductAddr;//记录生产的产品所存放的缓冲区
int ProductProducedNum;//记录该产品要被消费的次数
//Consume信息
int ProductConsuming;//下一个要被消费的产品号
int ProductNum;//记录该线程要消费的产品数
int ProductConsumed[MaxProductNum];//记录该线程要消费的产品
ThreadInfo()
{
ProductProducedNum=0;
ProductConsuming=0;
ProductNum=0;
}
};
const int MaxThreadNum=100;//允许的最大线程数
int ThreadNum=0;//实际的线程数
ThreadInfo Threads[MaxThreadNum];//线程信息数组
int* ThreadID;//指向一个用于存放线程号的数组,因为CreateThread()函数的参数是引用类型,为了区分
int BufferNum;//存放产品的缓冲区数
const char *FileAddr="D:\\ThreadInfo.txt";//数据文件
int *Buffer_Critical;//缓冲区
bool *Has_Product;//记录相应缓冲区是否有产品,和Buffer_Critical及Produce()配合使用
/**********************************************************************************************8
*******/
HANDLE h_Semaphore[MaxThreadNum];//对应线程的信号量
HANDLE h_Full;//初始化为BufferNum,判断是否有空缓冲区
HANDLE h_Mutex;//用于分配缓冲区时互斥
HANDLE *h_Threads;//线程HANDLE数组,存放各个线程的HANDLE
/**********************************************************************************************
********/
void ReadTestInfoFile()//读数据文件
{
ThreadInfo *ti;
ifstream input_file;
char ch;
input_file.open(FileAddr,ios::binary);//以二进制方式打开文件
if(input_file)
{
if(input_file.get(ch))BufferNum=ch-'0';//获得缓冲区数
input_file.get(ch);input_file.get(ch);//换行
do{
ti=&(Threads[ThreadNum++]);
if(input_file.get(ch))ti->ThreadNumber=ch-'0';if(input_file.get(ch));//读取线程号
if(input_file.get(ch))ti->ThreadType=ch;if(input_file.get(ch));//读取线程类型(P/C)
if(input_file.get(ch))ti->ThreadSleepTime=ch-'0';//读取睡眠时间
if(ti->ThreadType=='P')//生产者
input_file.get(ch);
else if(ti->ThreadType=='C')//消费者
while(input_file.get(ch)&&ch==' ')
{
input_file.get(ch);
ti->ProductConsumed[ti->ProductNum++]=ch-'0'-1;
}
}while(input_file.get(ch));
}
}
void InitData()//统计数据,并初始化变量
{
for(int i=0;i<ThreadNum;i++)
if(Threads[i].ThreadType=='C')
{
for(int j=0;j<Threads[i].ProductNum;j++)
{
(Threads[Threads[i].ProductConsumed[j]].ProductProducedNum)++;
}
}
ThreadID=new int[ThreadNum];
for(int i=0;i<ThreadNum;i++)
ThreadID[i]=i;
for(int i=0;i<MaxThreadNum;i++)
h_Semaphore[i]=CreateSemaphore(NULL,0,MaxThreadNum,NULL);
h_Full=CreateSemaphore(NULL,BufferNum,MaxThreadNum,NULL);
h_Mutex=CreateMutex(NULL,FALSE,NULL);
}
void DelData()//删除动态申请的内存空间
{
for(int m=0;m<ThreadNum;m++)CloseHandle(h_Threads[m]);
delete []h_Threads;
delete []Buffer_Critical;
delete []Has_Product;
//delete []FileAddr;
}
/************************************************************************
************/
void Get_Buffer(int j)//分配缓冲区,j为线程号
{
int k=0;
while(k<BufferNum&&Has_Product[k])k++;
Threads[j].ProductAddr=k;
}
void Produce(int j)//生产,j为线程号
{
Buffer_Critical[Threads[j].ProductAddr]=j;
}
bool HasConsumeReques(int j)//判断是否有消费请求,j为线程号
{
return(Threads[j].ProductNum>=Threads[j].ProductConsuming);
}
int GetProductID(int j)//取产品号,j为线程号
{
int n=Threads[j].ProductConsuming++;
return Threads[j].ProductConsumed[n];
}
void Consume(int n)//消费,n为产品号
{
Threads[n].ProductProducedNum--;
}
bool HasProductLeft(int n)//判断该产品是否还要消费,n为产品号
{
return(Threads[n].ProductProducedNum>0);
}
void DeleteProduct(int n)//删除产品,n为产品号
{
Has_Product[Threads[n].ProductAddr]=false;
}
/**********************************************************
********/
void SleepTime(int t)
{
for(;t>0;t--)
for(long i=10000;i>0;i--)
for(long j=10000;j>0;j--);
}
DWORD WINAPI Producer(LPVOID lpParam)//生产者线程函数,lpParam是线程号
{
int t=Threads[*(int*)lpParam].ThreadSleepTime;
SleepTime(t);
cout<<"ProducerThread:"<<*(int*)lpParam+1<<"Start"<<endl;
WaitForSingleObject(h_Full,INFINITE);//申请空的缓冲区
WaitForSingleObject(h_Mutex,INFINITE);
Get_Buffer(*(int*)lpParam);//为线程分配缓冲区
ReleaseMutex(h_Mutex);
cout<<"ProducerThread:"<<*(int*)lpParam+1<<"Produce"<<endl;
Produce(*(int*)lpParam);
ReleaseSemaphore(h_Semaphore[*(int*)lpParam],1,NULL);
cout<<"ProducerThread:"<<*(int*)lpParam+1<<"End"<<endl;
return 0;
}
DWORD WINAPI Consumer(LPVOID lpParam)//生产者线程函数,lpParam是线程号
{
int t=Threads[*(int*)lpParam].ThreadSleepTime;
SleepTime(t);
cout<<"ConsumerThread:"<<*(int*)lpParam+1<<"Start"<<endl;
int n=GetProductID(*(int*)lpParam);
while(HasConsumeReques(*(int*)lpParam))//是否有消费需求
{
WaitForSingleObject(h_Semaphore[n],INFINITE);
cout<<"ConsumerThread:"<<*(int*)lpParam+1<<"Consume:"<<n+1<<endl;
Consume(n);
if(HasProductLeft(n))
{
ReleaseSemaphore(h_Semaphore[n],1,NULL);
}
else
{
DeleteProduct(n);
ReleaseMutex(h_Full);
}
n=GetProductID(*(int*)lpParam);
}
cout<<"ConsumerThread:"<<*(int*)lpParam+1<<"End"<<endl;
return 0;
}
/**************************************************************************************
************/
void main()
{
ReadTestInfoFile();
InitData();
Buffer_Critical=new int[BufferNum];
Has_Product=new bool[BufferNum];
for(int n=0;n<BufferNum;n++)
Has_Product[n]=false;
h_Threads=new HANDLE[ThreadNum];
for(int m=0;m<ThreadNum;m++)
{
if(Threads[m].ThreadType=='P')
h_Threads[m]=CreateThread(NULL,0,Producer,&ThreadID[m],0,NULL);
else if(Threads[m].ThreadType=='C')
h_Threads[m]=CreateThread(NULL,0,Consumer,&ThreadID[m],0,NULL);
}
_getch();
DelData();
}