请教VC6 /LINUX GCC 多线程处理文本文件的问题
对一个很大的文本文件(例如8-9GB)做处理,单线程是 循环:读文件、处理、写输出文件发现 处理部分比较耗时,所以想到是否可以用多线程来处理:读文件1个线程、处理 N 个线程、写文件1个线程 ?
对处理的 N 个线程,想预先分配好最大的 N 个输入 BUFFER,但是输出 BUFFER 比较纠结,是预先申请好最大的 N 个 BUFFER?还是用的时候临时申请好呢(用完再释放)?
I/O buffer与线程的对应关系(想通过这个来防止惊群问题):
inBuffer[0] --> 处理线程1 --> outBuffer[0]
inBuffer[1] --> 处理线程2 --> outBuffer[1]
...
inBuffer[n-1] --> 处理线程n --> outBuffer[n-1]
对每个buffer,记录它的状态:inUse 还是 free 等
1、读文件的线程循环检查这N个buffer,看到哪个free,就读文件(M行,按inBuffer最大值读,往回找回车换行)到其中,然后事件通知对应的处理线程。
2、处理线程 i 接到事件后,知道 inBuffer[i-1] 有待处理内容,再看 outBuffer[i-1] 是否空闲:
若是,开始处理,处理结束给写文件的线程发事件通知
若否,等待写文件的线程给它通知(感觉这里会成为瓶颈)
3、写文件的线程:为了保证写文件的顺序性,接到处理线程的事件通知后,需检查 outBuffer[i-1] 对应的文件块号是否为已写块号+1,若是,则写入输出文件,若否,继续等待
本人刚学多线程,上面的设计是否有严重问题?应该怎么设计?
Linux 的 pthread_cond_wait 在WINDOWS下用 WaitForSingleObject 和CriticalSection 结合模拟吗?
请各位高手不吝指教,谢谢!
源码如下,请各位多多批评指正!谢谢!
/* 多线程练习程序,将输入文件每行变大写,存为输出文件
读文件RT:处理PT:写文件WT=1:N:1 测试
cc -DDBG -DDBG1 -DDBG2 -DDBG3 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -D_LARGEFILE64_SOURCE mt1n1.c -o mt1n1 -lpthread
*/
#ifdef _WIN32
#if !defined(_MT)
#error _beginthreadex requires a multithreaded C run-time library.
#endif
#define STRICT 1
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <process.h> /* _beginthread, _endthread */
#include <io.h> //_open(),_lseeki64(),_read(),_write(),_close()
#include <fcntl.h> //_O_BINARY
#include <sys/stat.h> //_S_IWRITE
#include <time.h> //_tzset()
#else
#include <pthread.h>
#include <sys/types.h> //off64_t
#include <sys/stat.h> //off_t, S_IRGRP,...
#include <unistd.h> //lseek64(),close(),read(),write()
#include <fcntl.h> //open64
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#ifdef DBG
#define DBGPRT printf
#else
#define DBGPRT ;//
#endif
#ifdef DBG1
#define DBGPRT1 printf
#else
#define DBGPRT1 ;//
#endif
#ifdef DBG2
#define DBGPRT2 printf
#else
#define DBGPRT2 ;//
#endif
#ifdef DBG3
#define DBGPRT3 printf
#else
#define DBGPRT3 ;//
#endif
//取得[a,b)之间的随机整数,使用(rand() % (b-a))+ a (结果值将含a不含b)。
#define random_1(a,b) ((rand()%(b-a))+a)
//取得取得[a,b]之间的随机整数,使用(rand() % (b-a+1))+ a (结果值将含a含b).
#define random_2(a,b) ((rand()%(b-a+1))+a)
//#define BUF_LEN 16*1024*1024 /* 16MB */
//#define FIND_BACK 64*1024 /* 64KB */
#define BUF_LEN 1*1024 /* 1KB */
#define FIND_BACK 256 /* 0.25KB */
/*
| 16MB | |
|------------------|-------------------|
| |<-64KB--|
p1 p2 p3
*/
size_t buf_len = BUF_LEN;
//最大线程数
#define MAX_PT_NUM 60
typedef struct buf_t
{
int bufstat; //0:free,1:reading,2:read end,3:processing,4:proc end,5:writing
int fileBlockNo;
int buflen;
char *bufptr;
} BUF_T;
#ifdef _WIN32
CRITICAL_SECTION g_cs_bufi, g_cs_bufo;
HANDLE hEvent_bufi, hEvent_bufo, hEvent_r[MAX_PT_NUM], hEvent_w[MAX_PT_NUM];
#else
pthread_mutex_t g_mutex_r[MAX_PT_NUM], g_mutex_w[MAX_PT_NUM], bufi_mutex = PTHREAD_MUTEX_INITIALIZER, bufo_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_condv_r[MAX_PT_NUM], g_condv_w[MAX_PT_NUM], bufi_threshold_cv = PTHREAD_COND_INITIALIZER, bufo_threshold_cv = PTHREAD_COND_INITIALIZER;
#endif
/*
int giReadBufStat[MAX_PT_NUM]; //0:free,1:reading,2:read end,3:processing,4:proc end->0
int giWriteBufStat[MAX_PT_NUM]; //0:free,3:processing,4:proc end,5:writing,6:write end->0
int giWriteBufIdx = 0;
int giFileBlockNo[MAX_PT_NUM], giRFBN = 0;
*/
int giRFBN = 0;
BUF_T gtReadBuf[MAX_PT_NUM];
BUF_T gtWriteBuf[MAX_PT_NUM];
//线程数
int giTN = 3;
int eof_err = 0, rtf_exit = 0, ptf_exit = 0, wtf_exit = 0;
//char *bufi[MAX_PT_NUM], *bufo[MAX_PT_NUM];
//int bufilen[MAX_PT_NUM], bufolen[MAX_PT_NUM];
struct thread_data
{
long thread_id;
int err;
int fd;
#ifdef _WIN32
__int64 offset;
__int64 len;
#else
off_t offset;
off_t len;
#endif
};
struct thread_data thread_data_array[MAX_PT_NUM+2];
#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
#define DELTA_EPOCH_IN_MICROSECS10 116444736000000000Ui64
#else
#define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
#define DELTA_EPOCH_IN_MICROSECS10 116444736000000000ULL
#endif
#if defined(_MSC_VER)
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* and microseconds */
};
struct timezone
{
int tz_minuteswest; /* Minutes west of GMT. */
int tz_dsttime; /* Nonzero if DST is ever in effect. */
};
int gettimeofday(struct timeval *tv, struct timezone *tz)
{
FILETIME ft;
unsigned __int64 tmpres = 0;
static int tzflag = 0;
if (NULL != tv)
{
GetSystemTimeAsFileTime(&ft);
tmpres |= ft.dwHighDateTime;
tmpres <<= 32;
tmpres |= ft.dwLowDateTime;
tmpres /= 10; /*convert into microseconds*/
/*converting file time to unix epoch*/
tmpres -= DELTA_EPOCH_IN_MICROSECS;
tv->tv_sec = (long)(tmpres / 1000000UL);
tv->tv_usec = (long)(tmpres % 1000000UL);
}
if (NULL != tz)
{
if (!tzflag)
{
_tzset();
tzflag++;
}
tz->tz_minuteswest = _timezone / 60;
tz->tz_dsttime = _daylight;
}
return 0;
}
/* ------------------------------------------------------------------------- */
char *GetWin32ErrMsg(DWORD GLE)
{
LPVOID buf;
LPVOID lpMsgBuf = (LPVOID)"unknown Windows error";
char *szMessage = buf;
if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_IGNORE_INSERTS |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_MAX_WIDTH_MASK,
NULL,
GLE,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR) &buf,
0,
NULL))
{
int n = strlen(szMessage);
if(n > 0 && szMessage[n-1] == '\n') szMessage[--n] = 0;
if(n > 0 && szMessage[n-1] == '\r') szMessage[--n] = 0;
if(n > 0 && szMessage[n-1] == '\n') szMessage[--n] = 0;
LocalFree(buf);
return buf;
}
return lpMsgBuf;
}
#endif
/* ------------------------------------------------------------------------- */
void *strmovcpy(void *dest, const void *src, size_t n)
/*
字符串拷贝: 将 src 的 n 个字节复制到 dest, 且在dest 之后加上 NUL
返回 dest
*/
{
memmove(dest, src, n);
*((char *)dest + n) = '\0';
return dest;
}
#ifdef _WIN32
unsigned __stdcall readf_thread_func(void *threadarg)
#else
void *readf_thread_func(void *threadarg)
#endif
{
struct thread_data *my_data;
long taskid;
int rc = 0, read_ret;
int i, k, n;
char *rem_buf = NULL;
char *p, *p1, *p2, *p3;
int rem_len = 0;
char tmpbuf[BUF_LEN+1];
my_data = (struct thread_data *) threadarg;
taskid = my_data->thread_id;
DBGPRT1("@readf_thread_func(), taskid=%ld\n", taskid);
for(n = 1; ; n++)
{
DBGPRT1("@readf_thread_func(): thread %ld locking mutex.\n", taskid);
#ifdef _WIN32
#else
pthread_mutex_lock(&bufi_mutex);
#endif
while (1)
{
#ifdef _WIN32
EnterCriticalSection( &g_cs_bufi );
#else
#endif
for(i = 0, k = -1; i < giTN; i++)
{
// if(giReadBufStat[i] == 0)
if(gtReadBuf[i].bufstat == 0)
{
k = i;
break;
}
}
#ifdef _WIN32
if(k >= 0)
{
// giReadBufStat[k] = 1;
// giFileBlockNo[k] = n;
gtReadBuf[k].bufstat = 1;
gtReadBuf[k].fileBlockNo = n;
LeaveCriticalSection( &g_cs_bufi );
break;
}
LeaveCriticalSection( &g_cs_bufi );
#else
if(k >= 0) break;
#endif
#ifdef _WIN32
DBGPRT1("=== readf_thread_func(): thread %ld, before WaitForSingleObject...\n", taskid);
WaitForSingleObject( hEvent_bufi, INFINITE );
DBGPRT1("*** readf_thread_func(): thread %ld, after WaitForSingleObject.\n", taskid);
#else
DBGPRT1("=== readf_thread_func(): thread %ld, Going into wait...\n", taskid);
pthread_cond_wait(&bufi_threshold_cv, &bufi_mutex);
DBGPRT1("*** readf_thread_func(): thread %ld Condition signal received.\n", taskid);
#endif
}
DBGPRT1("@readf_thread_func(): thread %ld Reading file, k=%d ...\n", taskid, k);
#ifdef _WIN32
#else
// giReadBufStat[k] = 1;
// giFileBlockNo[k] = n;
gtReadBuf[k].bufstat = 1;
gtReadBuf[k].fileBlockNo = n;
pthread_mutex_unlock(&bufi_mutex);
#endif
DBGPRT1("@readf_thread_func(): thread %ld Unlocking mutex.\n", taskid);
// p1 = bufi[k];
p1 = gtReadBuf[k].bufptr;
// if(rem_len > 0) memmove(bufi[k], rem_buf, rem_len);
if(rem_len > 0) memmove(p1, rem_buf, rem_len);
#ifdef _WIN32
// read_ret = _read(my_data->fd, bufi[k] + rem_len, buf_len - rem_len);
read_ret = _read(my_data->fd, p1 + rem_len, buf_len - rem_len);
#else
// read_ret = read(my_data->fd, bufi[k] + rem_len, buf_len - rem_len);
read_ret = read(my_data->fd, p1 + rem_len, buf_len - rem_len);
#endif
DBGPRT1("@readf_thread_func(): thread %ld read_ret1 = %d\n", taskid, read_ret);
if(read_ret == 0)
{
eof_err = 1;
// giReadBufStat[k] = 0;
gtReadBuf[k].bufstat = 0;
goto end;
}
else if(read_ret == -1)
{
eof_err = -1;
// giReadBufStat[k] = 0;
gtReadBuf[k].bufstat = 0;
rc = -1;
goto end;
}
//p = bufi[k] + rem_len;
//p[read_ret] = 0x00;
//strmovcpy(tmpbuf, p1, read_ret + rem_len);
//printf("*** buf_len=%d, read_ret=%d, rem_len=%d, p1=[%s]\n", buf_len, read_ret, rem_len, tmpbuf);
printf("*** buf_len=%d, read_ret=%d, rem_len=%d\n", buf_len, read_ret, rem_len);
if(read_ret == buf_len - rem_len)
{
p = p3 = p1 + buf_len - 1;
while(*p != '\n')
{
if(p3 - p >= FIND_BACK)
{
eof_err = 2;
goto end;
}
p--;
}
p2 = p;
read_ret = p2 + 1 - p1;
rem_buf = p2 + 1;
rem_len = buf_len - read_ret;
//strmovcpy(tmpbuf, rem_buf, rem_len);
//printf("=== buf_len=%d, read_ret=%d, rem_len=%d, rem_buf=[%s]\n", buf_len, read_ret, rem_len, tmpbuf);
printf("=== buf_len=%d, read_ret=%d, rem_len=%d\n", buf_len, read_ret, rem_len);
}
else
{
read_ret += rem_len;
rem_buf = NULL;
rem_len = 0;
}
// bufilen[k] = read_ret;
gtReadBuf[k].buflen = read_ret;
DBGPRT1("--- @readf_thread_func(): thread %ld, block no = #%d, k=%d, len(read_ret)=%d\n", taskid, n, k, read_ret);
// giReadBufStat[k] = 2;
gtReadBuf[k].bufstat = 2;
#ifdef _WIN32
SetEvent(hEvent_r[k]);
#else
pthread_cond_signal(&g_condv_r[k]);
#endif
end:
if(eof_err != 0) break;
}
DBGPRT1("@readf_thread_func(): thread %ld End, rc = %d.\n", taskid, rc);
my_data->err = rc;
rtf_exit = 1;
giRFBN = n;
#ifdef _WIN32
SetEvent(hEvent_bufo);
for(i = 0; i < giTN; i++)
{
SetEvent(hEvent_r[i]);
}
_endthreadex( 0 );
return 0;
#else
pthread_cond_signal(&bufo_threshold_cv);
for(i = 0; i < giTN; i++)
{
pthread_cond_signal(&g_condv_r[i]);
}
pthread_exit(NULL);
#endif
}
#ifdef _WIN32
unsigned __stdcall proc_thread_func(void *threadarg)
#else
void *proc_thread_func(void *threadarg)
#endif
{
struct thread_data *my_data;
long taskid;
int rc = 0, i = 0, k;
my_data = (struct thread_data *) threadarg;
taskid = my_data->thread_id;
DBGPRT2("@proc_thread_func(), taskid=%ld\n", taskid);
while(1)
{
#ifdef _WIN32
// if((rtf_exit == 1 || eof_err != 0) && giReadBufStat[taskid] == 0) goto end;
if((rtf_exit == 1 || eof_err != 0) && gtReadBuf[taskid].bufstat == 0) goto end;
// if(giReadBufStat[taskid] != 2)
if(gtReadBuf[taskid].bufstat != 2)
{
DBGPRT2("@proc_thread_func(): thread %ld, before WaitForSingleObject(hEvent_r).\n", taskid);
WaitForSingleObject( hEvent_r[taskid], INFINITE );
DBGPRT2("@proc_thread_func(): thread %ld, after WaitForSingleObject(hEvent_r).\n", taskid);
}
// if(giWriteBufStat[taskid] != 0)
if(gtWriteBuf[taskid].bufstat != 0)
{
DBGPRT2("@proc_thread_func(): thread %ld, before WaitForSingleObject(hEvent_w).\n", taskid);
WaitForSingleObject( hEvent_w[taskid], INFINITE );
DBGPRT2("@proc_thread_func(): thread %ld, after WaitForSingleObject(hEvent_w).\n", taskid);
}
#else
DBGPRT2("@proc_thread_func(): thread %ld locking mutex bufi_mutex. giReadBufStat[taskid]=%d\n", taskid, giReadBufStat[taskid]);
//pthread_mutex_lock(&bufi_mutex);
pthread_mutex_lock(&g_mutex_r[taskid]);
// while (giReadBufStat[taskid] != 2)
while (gtReadBuf[taskid].bufstat != 2)
{
// if((rtf_exit == 1 || eof_err != 0) && giReadBufStat[taskid] == 0) goto end;
if((rtf_exit == 1 || eof_err != 0) && gtReadBuf[taskid].bufstat == 0) goto end;
// DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufi. giReadBufStat[taskid]=%d, rtf_exit = %d, eof_err = %d...\n", taskid, giReadBufStat[taskid], rtf_exit, eof_err);
DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufi. gtReadBuf[taskid].bufstat=%d, rtf_exit = %d, eof_err = %d...\n", taskid, gtReadBuf[taskid].bufstat, rtf_exit, eof_err);
// pthread_cond_wait(&g_condv_r[taskid], &bufi_mutex);
pthread_cond_wait(&g_condv_r[taskid], &g_mutex_r[taskid]);
DBGPRT2("*** proc_thread_func(): thread %ld Condition signal received. \n", taskid);
}
DBGPRT2("@proc_thread_func(): thread %ld locking mutex bufo_mutex.\n", taskid);
//pthread_mutex_lock(&bufo_mutex);
pthread_mutex_lock(&g_mutex_w[taskid]);
// while (giWriteBufStat[taskid] != 0)
while (gtWriteBuf[taskid].bufstat != 0)
{
DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufo...\n", taskid);
// pthread_cond_wait(&g_condv_w[taskid], &bufo_mutex);
pthread_cond_wait(&g_condv_w[taskid], &g_mutex_w[taskid]);
DBGPRT2("*** proc_thread_func(): thread %ld Condition signal received.\n", taskid);
}
DBGPRT2("@proc_thread_func(): thread %ld Unlocking mutex.\n", taskid);
// pthread_mutex_unlock(&bufo_mutex);
// pthread_mutex_unlock(&bufi_mutex);
pthread_mutex_unlock(&g_mutex_w[taskid]);
pthread_mutex_unlock(&g_mutex_r[taskid]);
#endif
// DBGPRT2("@proc_thread_func(): thread %ld Processing file, giReadBufStat[taskid]=%d, giWriteBufStat[taskid]=%d ...\n", taskid, giReadBufStat[taskid], giWriteBufStat[taskid]);
DBGPRT2("@proc_thread_func(): thread %ld Processing file, gtReadBuf[taskid].bufstat=%d, gtWriteBuf[taskid].bufstat=%d ...\n", taskid, gtReadBuf[taskid].bufstat, gtWriteBuf[taskid].bufstat);
//i = 0;
//if(giReadBufStat[taskid] == 2 && giWriteBufStat[taskid] == 0)
{
// giReadBufStat[taskid] = 3;
// giWriteBufStat[taskid] = 3;
gtReadBuf[taskid].bufstat = 3;
gtWriteBuf[taskid].bufstat = 3;
#ifdef _WIN32
//Sleep(random_1(5,20) * 1000);
#else
//sleep(random_1(5,20));
#endif
// for(i = 0; i < bufilen[taskid]; i++)
for(i = 0; i < gtReadBuf[taskid].buflen; i++)
{
// bufo[taskid][i] = toupper(bufi[taskid][i]);
gtWriteBuf[taskid].bufptr[i] = toupper(gtReadBuf[taskid].bufptr[i]);
}
// bufolen[taskid] = bufilen[taskid];
gtWriteBuf[taskid].buflen = gtReadBuf[taskid].buflen;
// giReadBufStat[taskid] = 0;
// giWriteBufStat[taskid] = 4;
gtReadBuf[taskid].bufstat = 0;
gtWriteBuf[taskid].bufstat = 4;
gtWriteBuf[taskid].fileBlockNo = gtReadBuf[taskid].fileBlockNo;
#ifdef _WIN32
DBGPRT2("@proc_thread_func(): thread %ld SetEvent ...\n", taskid);
SetEvent(hEvent_bufi);
SetEvent(hEvent_bufo);
#else
DBGPRT2("@proc_thread_func(): thread %ld pthread_cond_signal ...\n", taskid);
pthread_cond_signal(&bufi_threshold_cv);
pthread_cond_signal(&bufo_threshold_cv);
#endif
}
// if(eof_err != 0 && buf1istat != 1 && buf2istat != 1) break;
}
// printf("proc_thread_func(): thread %ld End, rc = %d, proc_len = %ld.\n", taskid, rc, proc_len);
end:
DBGPRT2("proc_thread_func(): thread %ld End, rc = %d\n", taskid, rc);
my_data->err = rc;
ptf_exit = 1;
#ifdef _WIN32
_endthreadex( 0 );
return 0;
#else
pthread_exit(NULL);
#endif
}
#ifdef _WIN32
unsigned __stdcall writef_thread_func(void *threadarg)
#else
void *writef_thread_func(void *threadarg)
#endif
{
struct thread_data *my_data;
long taskid;
int rc = 0, i = 0, k, iWFBN = 1;
size_t len;
#ifdef _WIN32
__int64 write_len = 0;
int write_ret1;
#else
off_t write_len = 0;
ssize_t write_ret1;
#endif
my_data = (struct thread_data *) threadarg;
taskid = my_data->thread_id;
DBGPRT3("@writef_thread_func(), taskid=%ld, fd=%d\n", taskid, my_data->fd);
while(1)
{
#ifdef _WIN32
#else
DBGPRT3("@writef_thread_func(): thread %ld locking mutex.\n", taskid);
pthread_mutex_lock(&bufo_mutex);
#endif
while (1)
{
#ifdef _WIN32
EnterCriticalSection( &g_cs_bufo );
#else
#endif
for(i = 0, k = -1; i < giTN; i++)
{
// DBGPRT3("@writef_thread_func(): thread %ld, giWriteBufStat[%d]=%d, giFileBlockNo[%d]=%d\n", taskid, i, giWriteBufStat[i], i, giFileBlockNo[i]);
DBGPRT3("@writef_thread_func(): thread %ld, gtWriteBuf[%d].bufstat=%d, gtWriteBuf[%d].fileBlockNo=%d\n", taskid, i, gtWriteBuf[i].bufstat, i, gtWriteBuf[i].fileBlockNo);
// if(giWriteBufStat[i] == 4 && giFileBlockNo[i] == iWFBN)
if(gtWriteBuf[i].bufstat == 4 && gtWriteBuf[i].fileBlockNo == iWFBN)
{
k = i;
break;
}
}
#ifdef _WIN32
if(k >= 0)
{
// giWriteBufStat[k] = 5;
gtWriteBuf[k].bufstat = 5;
LeaveCriticalSection( &g_cs_bufo );
break;
}
LeaveCriticalSection( &g_cs_bufo );
#else
if(k >= 0) break;
#endif
DBGPRT3("=== writef_thread_func(): thread %ld, Going into wait, giRFBN=%d, iWFBN=%d ...\n", taskid, giRFBN, iWFBN);
if((rtf_exit == 1 || eof_err != 0) && giRFBN == iWFBN) break;
#ifdef _WIN32
DBGPRT3("=== writef_thread_func(): thread %ld, before WaitForSingleObject...\n", taskid);
WaitForSingleObject( hEvent_bufo, INFINITE );
DBGPRT3("*** writef_thread_func(): thread %ld, after WaitForSingleObject.\n", taskid);
#else
pthread_cond_wait(&bufo_threshold_cv, &bufo_mutex);
DBGPRT3("*** writef_thread_func(): thread %ld Condition signal received.\n", taskid);
#endif
}
#ifdef _WIN32
#else
// giWriteBufStat[k] = 5;
gtWriteBuf[k].bufstat = 5;
pthread_mutex_unlock(&bufo_mutex);
#endif
// len = bufolen[k];
len = gtWriteBuf[k].buflen;
DBGPRT3("@writef_thread_func(): thread %ld Writing file, k=%d, len=%d ...\n", taskid, k, len);
#ifdef _WIN32
// write_ret1 = _write(my_data->fd, bufo[k], len);
write_ret1 = _write(my_data->fd, gtWriteBuf[k].bufptr, len);
#else
// write_ret1 = write(my_data->fd, bufo[k], len);
write_ret1 = write(my_data->fd, gtWriteBuf[k].bufptr, len);
#endif
write_len += write_ret1;
if(write_ret1 != len)
{
#ifdef _WIN32
printf("Error: _write, GetLastError()=%d, errno=%d(%s)\n", GetLastError(), errno, strerror(errno));
#else
printf("Error: write失败, errno=%d(%s)\n", errno, strerror(errno));
#endif
rc = -1;
goto end;
}
#ifdef _WIN32
_commit(my_data->fd);
#endif
// giWriteBufStat[k] = 0;
gtWriteBuf[k].bufstat = 0;
// giWriteBufStat[giWriteBufIdx % giTN] = 0;
// DBGPRT3("@writef_thread_func(): thread %ld pthread_cond_signal, giWriteBufIdx=%d ...\n", taskid, giWriteBufIdx);
// pthread_cond_signal(&g_condv_w[giWriteBufIdx % giTN]);
#ifdef _WIN32
SetEvent(hEvent_w[k]);
#else
DBGPRT3("@writef_thread_func(): thread %ld pthread_cond_signal, write_ret1=%d ...\n", taskid, write_ret1);
pthread_cond_signal(&g_condv_w[k]);
#endif
end:
// DBGPRT3("writef_thread_func(): thread %ld Unlocking mutex. buf1ostat = %d, buf2ostat = %d, rc = %d, eof_err = %d, rtf_exit = %d, ptf_exit = %d\n", taskid, buf1ostat, buf2ostat, rc, eof_err, rtf_exit, ptf_exit);
//pthread_mutex_unlock(&bufo_mutex);
if(rc < 0) break;
//if(buf1ostat == 2 || buf2ostat == 2) continue;
//if(eof_err != 0) break;
// if(rtf_exit != 0 && ptf_exit != 0) break;
if((rtf_exit == 1 || eof_err != 0) && giRFBN == iWFBN) break;
iWFBN++;
}
DBGPRT3("writef_thread_func(): thread %ld End, rc = %d, write_len = %ld.\n", taskid, rc, write_len);
wtf_exit = 1;
for(i = 0; i < giTN; i++)
{
#ifdef _WIN32
SetEvent(hEvent_r[i]);
#else
pthread_cond_signal(&g_condv_r[i]);
#endif
}
my_data->err = rc;
#ifdef _WIN32
_endthreadex( 0 );
return 0;
#else
pthread_exit(NULL);
#endif
}
int main(int argc, char *argv[])
{
#ifdef _WIN32
HANDLE hThread[MAX_PT_NUM];
unsigned uiThreadID[MAX_PT_NUM];
#else
pthread_t threads[MAX_PT_NUM];
#endif
int rc, *taskids[MAX_PT_NUM];
int i, t = 0;
int fd1, fd2;
struct timeval begin, current;
double elapsed;
if(argc < 3)
{
printf("多线程练习程序,将输入文件每行变大写,存为输出文件\n");
printf("用法:%s 输入文件名 输出文件名 [maxBufLen(MB) [maxThreadNum]]\n", argv[0]);
return 1;
}
if(argc >= 4) buf_len = atoi(argv[3]) * 1024 * 1024L;
if(argc >= 5) giTN = atoi(argv[4]);
gettimeofday(&begin, 0);
#ifdef _WIN32
if((fd1 = _open(argv[1], _O_BINARY)) == -1)
#else
if((fd1 = open64(argv[1], O_RDONLY | O_LARGEFILE)) == -1)
#endif
{
printf("Open file error!\n");
return -1;
}
#ifdef _WIN32
if((fd2 = _open(argv[2], _O_BINARY | _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE)) == -1)
#else
if((fd2 = open64(argv[2], O_CREAT | O_WRONLY | O_TRUNC | O_LARGEFILE,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) == -1)
#endif
{
printf("Create file error!\n");
#ifdef _WIN32
_close(fd1);
#else
close(fd1);
#endif
return -2;
}
DBGPRT("--- fd1=%d, fd2=%d\n", fd1, fd2);
srand(time(0));
/*
memset(giReadBufStat, 0x00, sizeof(giReadBufStat));
memset(giWriteBufStat, 0x00, sizeof(giWriteBufStat));
memset(giFileBlockNo, 0x00, sizeof(giFileBlockNo));
*/
memset(gtReadBuf, 0x00, sizeof(gtReadBuf));
memset(gtWriteBuf, 0x00, sizeof(gtWriteBuf));
#ifdef _WIN32
InitializeCriticalSection( &g_cs_bufi );
InitializeCriticalSection( &g_cs_bufo );
hEvent_bufi = CreateEvent( NULL, FALSE, FALSE, NULL );
hEvent_bufo = CreateEvent( NULL, FALSE, FALSE, NULL );
#else
#endif
for(i = 0; i < giTN; i++)
{
// if((bufi[i] = (char *)malloc(buf_len + 1)) == NULL)
if((gtReadBuf[i].bufptr = (char *)malloc(buf_len + 1)) == NULL)
{
printf("Insufficient memory available for bufi[%d]\n", i);
return -1;
}
// if((bufo[i] = (char *)malloc(buf_len + 1)) == NULL)
if((gtWriteBuf[i].bufptr = (char *)malloc(buf_len + 1)) == NULL)
{
printf("Insufficient memory available for bufo[%d]\n", i);
return -1;
}
#ifdef _WIN32
hEvent_r[i] = CreateEvent( NULL, FALSE, FALSE, NULL );
hEvent_w[i] = CreateEvent( NULL, FALSE, FALSE, NULL );
#else
/*
g_mutex_r[i] = PTHREAD_MUTEX_INITIALIZER;
g_mutex_w[i] = PTHREAD_MUTEX_INITIALIZER;
g_condv_r[i] = PTHREAD_COND_INITIALIZER;
g_condv_w[i] = PTHREAD_COND_INITIALIZER;
*/
pthread_mutex_init(&g_mutex_r[i], NULL);
pthread_mutex_init(&g_mutex_w[i], NULL);
pthread_cond_init (&g_condv_r[i], NULL);
pthread_cond_init (&g_condv_w[i], NULL);
#endif
}
t = giTN+1;
thread_data_array[t].thread_id = t;
thread_data_array[t].err = 0;
thread_data_array[t].fd = fd2;
DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
hThread[t] = (HANDLE)_beginthreadex( NULL, // security
0, // stack size
&writef_thread_func,
(void *) &thread_data_array[t], // arg list
0, //initflag
&uiThreadID[t] );
if ( hThread[t] == 0 )
{
printf("ERROR: Failed to create thread: _beginthreadex(writef_thread_func)\n");
exit(-1);
}
#else
rc = pthread_create(&threads[t], NULL, writef_thread_func, (void *) &thread_data_array[t]);
if (rc)
{
printf("ERROR: return code from pthread_create(writef_thread_func) is %d\n", rc);
exit(-1);
}
#endif
for(t = 0; t < giTN; t++)
{
thread_data_array[t].thread_id = t;
thread_data_array[t].err = 0;
thread_data_array[t].fd = 0;
DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
hThread[t] = (HANDLE)_beginthreadex( NULL, // security
0, // stack size
&proc_thread_func,
(void *) &thread_data_array[t], // arg list
0, //initflag
&uiThreadID[t] );
if ( hThread[t] == 0 )
{
printf("ERROR: Failed to create thread: _beginthreadex(proc_thread_func)\n");
exit(-1);
}
#else
rc = pthread_create(&threads[t], NULL, proc_thread_func, (void *) &thread_data_array[t]);
if (rc)
{
printf("ERROR: return code from pthread_create(proc_thread_func) is %d\n", rc);
exit(-1);
}
#endif
}
//printf("t= %d\n", t);
thread_data_array[t].thread_id = t;
thread_data_array[t].err = 0;
thread_data_array[t].fd = fd1;
DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
hThread[t] = (HANDLE)_beginthreadex( NULL, // security
0, // stack size
&readf_thread_func,
(void *) &thread_data_array[t], // arg list
0, //initflag
&uiThreadID[t] );
if ( hThread[t] == 0 )
{
printf("ERROR: Failed to create thread: _beginthreadex(readf_thread_func)\n");
exit(-1);
}
#else
rc = pthread_create(&threads[t], NULL, readf_thread_func, (void *) &thread_data_array[t]);
if (rc)
{
printf("ERROR: return code from pthread_create(readf_thread_func) is %d\n", rc);
exit(-1);
}
#endif
/* Wait for all threads to complete */
#ifdef _WIN32
for (i = 0; i < giTN+2; i++)
{
DBGPRT("--- WaitForSingleObject(hThread[%d])...\n", i);
WaitForSingleObject( hThread[i], INFINITE );
}
#else
DBGPRT("--- pthread_join()...\n");
for (i = 0; i < giTN+2; i++)
{
pthread_join(threads[i], NULL);
//pthread_join(threads[i], (void **)&ret_val);
//printf("i = %d, pthread_join retval = %d\n", i, *(int *) retval);
//printf("i = %d, pthread_join retval = %d\n", i, (int)ret_val);
DBGPRT("=== After pthread_join(), i = %d, err = %d\n", i, thread_data_array[i].err);
}
#endif
/* Clean up and exit */
#ifdef _WIN32
DeleteCriticalSection( &g_cs_bufi );
DeleteCriticalSection( &g_cs_bufo );
#else
for(i = 0; i < giTN; i++)
{
pthread_mutex_destroy(&g_mutex_r[i]);
pthread_mutex_destroy(&g_mutex_w[i]);
pthread_cond_destroy(&g_condv_r[i]);
pthread_cond_destroy(&g_condv_w[i]);
}
pthread_mutex_destroy(&bufi_mutex);
pthread_mutex_destroy(&bufo_mutex);
pthread_cond_destroy(&bufi_threshold_cv);
pthread_cond_destroy(&bufo_threshold_cv);
#endif
gettimeofday(¤t, (struct timezone*) 0);
elapsed = (current.tv_sec - begin.tv_sec) +
((current.tv_usec - begin.tv_usec) / 1000000.0F);
printf("--- 耗时 %f 秒\n", elapsed);
#ifdef _WIN32
// Destroy the thread object.
for (i = 0; i < giTN+2; i++)
{
CloseHandle( hThread[i] );
}
#else
pthread_exit (NULL);
#endif
return 0;
}