| 网站首页 | 业界新闻 | 小组 | 威客 | 人才 | 下载频道 | 博客 | 代码贴 | 在线编程 | 编程论坛
欢迎加入我们,一同切磋技术
用户名:   
 
密 码:  
共有 831 人关注过本帖
标题:请教VC6 /LINUX GCC 多线程处理文本文件的问题
取消只看楼主 加入收藏
NB2011
Rank: 1
等 级:新手上路
帖 子:8
专家分:5
注 册:2011-7-17
结帖率:100%
收藏
 问题点数:0 回复次数:1 
请教VC6 /LINUX GCC 多线程处理文本文件的问题
对一个很大的文本文件(例如8-9GB)做处理,单线程是 循环:读文件、处理、写输出文件
 发现 处理部分比较耗时,所以想到是否可以用多线程来处理:读文件1个线程、处理 N 个线程、写文件1个线程 ?

 对处理的 N 个线程,想预先分配好最大的 N 个输入 BUFFER,但是输出 BUFFER 比较纠结,是预先申请好最大的 N 个 BUFFER?还是用的时候临时申请好呢(用完再释放)?

I/O buffer与线程的对应关系(想通过这个来防止惊群问题):
inBuffer[0] -->  处理线程1  --> outBuffer[0]
 inBuffer[1] -->  处理线程2  --> outBuffer[1]
 ...
 inBuffer[n-1] -->  处理线程n  --> outBuffer[n-1]

对每个buffer,记录它的状态:inUse 还是 free 等

1、读文件的线程循环检查这N个buffer,看到哪个free,就读文件(M行,按inBuffer最大值读,往回找回车换行)到其中,然后事件通知对应的处理线程。
2、处理线程 i 接到事件后,知道 inBuffer[i-1] 有待处理内容,再看 outBuffer[i-1] 是否空闲:
 若是,开始处理,处理结束给写文件的线程发事件通知
 若否,等待写文件的线程给它通知(感觉这里会成为瓶颈)
3、写文件的线程:为了保证写文件的顺序性,接到处理线程的事件通知后,需检查 outBuffer[i-1] 对应的文件块号是否为已写块号+1,若是,则写入输出文件,若否,继续等待

 本人刚学多线程,上面的设计是否有严重问题?应该怎么设计?
Linux 的 pthread_cond_wait 在WINDOWS下用 WaitForSingleObject 和CriticalSection 结合模拟吗?
 请各位高手不吝指教,谢谢!

源码如下,请各位多多批评指正!谢谢!
/* 多线程练习程序,将输入文件每行变大写,存为输出文件
读文件RT:处理PT:写文件WT=1:N:1 测试
cc -DDBG -DDBG1 -DDBG2 -DDBG3 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -D_LARGEFILE64_SOURCE mt1n1.c -o mt1n1 -lpthread
*/

#ifdef _WIN32
    #if !defined(_MT)
    #error _beginthreadex requires a multithreaded C run-time library.
    #endif

    #define STRICT 1
    #define WIN32_LEAN_AND_MEAN
    #include <windows.h>
    #include <process.h>    /* _beginthread, _endthread */

    #include <io.h>            //_open(),_lseeki64(),_read(),_write(),_close()
    #include <fcntl.h>      //_O_BINARY
    #include <sys/stat.h>   //_S_IWRITE
    #include <time.h>        //_tzset()
#else
    #include <pthread.h>
    #include <sys/types.h>    //off64_t
    #include <sys/stat.h>   //off_t, S_IRGRP,...
    #include <unistd.h>        //lseek64(),close(),read(),write()
    #include <fcntl.h>      //open64
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#ifdef DBG
#define DBGPRT printf
#else
#define DBGPRT ;//
#endif

#ifdef DBG1
#define DBGPRT1 printf
#else
#define DBGPRT1 ;//
#endif

#ifdef DBG2
#define DBGPRT2 printf
#else
#define DBGPRT2 ;//
#endif

#ifdef DBG3
#define DBGPRT3 printf
#else
#define DBGPRT3 ;//
#endif

//取得[a,b)之间的随机整数,使用(rand() % (b-a))+ a (结果值将含a不含b)。
#define random_1(a,b) ((rand()%(b-a))+a)
//取得取得[a,b]之间的随机整数,使用(rand() % (b-a+1))+ a (结果值将含a含b).
#define random_2(a,b) ((rand()%(b-a+1))+a)

//#define BUF_LEN     16*1024*1024  /* 16MB */
//#define FIND_BACK   64*1024       /* 64KB */
#define BUF_LEN     1*1024  /* 1KB */
#define FIND_BACK   256       /* 0.25KB */
/*
  |    16MB          |                   |
  |------------------|-------------------|
  |         |<-64KB--|
  p1           p2    p3
*/
size_t buf_len = BUF_LEN;

//最大线程数
#define MAX_PT_NUM  60

typedef struct buf_t
{
    int bufstat; //0:free,1:reading,2:read end,3:processing,4:proc end,5:writing
    int fileBlockNo;
    int buflen;
    char *bufptr;
} BUF_T;

#ifdef _WIN32
CRITICAL_SECTION g_cs_bufi, g_cs_bufo;
HANDLE hEvent_bufi, hEvent_bufo, hEvent_r[MAX_PT_NUM], hEvent_w[MAX_PT_NUM];
#else
pthread_mutex_t g_mutex_r[MAX_PT_NUM], g_mutex_w[MAX_PT_NUM], bufi_mutex = PTHREAD_MUTEX_INITIALIZER, bufo_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_condv_r[MAX_PT_NUM], g_condv_w[MAX_PT_NUM], bufi_threshold_cv = PTHREAD_COND_INITIALIZER, bufo_threshold_cv = PTHREAD_COND_INITIALIZER;
#endif
/*
int  giReadBufStat[MAX_PT_NUM];  //0:free,1:reading,2:read end,3:processing,4:proc end->0
int  giWriteBufStat[MAX_PT_NUM]; //0:free,3:processing,4:proc end,5:writing,6:write end->0
int  giWriteBufIdx = 0;
int  giFileBlockNo[MAX_PT_NUM], giRFBN = 0;
*/
int  giRFBN = 0;
BUF_T gtReadBuf[MAX_PT_NUM];
BUF_T gtWriteBuf[MAX_PT_NUM];

//线程数
int  giTN = 3;
int  eof_err = 0, rtf_exit = 0, ptf_exit = 0, wtf_exit = 0;
//char *bufi[MAX_PT_NUM], *bufo[MAX_PT_NUM];
//int  bufilen[MAX_PT_NUM], bufolen[MAX_PT_NUM];

struct thread_data
{
    long thread_id;
    int  err;
    int  fd;
#ifdef _WIN32
    __int64 offset;
    __int64 len;
#else
    off_t offset;
    off_t len;
#endif
};
struct thread_data thread_data_array[MAX_PT_NUM+2];

#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
    #define DELTA_EPOCH_IN_MICROSECS    11644473600000000Ui64
    #define DELTA_EPOCH_IN_MICROSECS10    116444736000000000Ui64
#else
    #define DELTA_EPOCH_IN_MICROSECS    11644473600000000ULL
    #define DELTA_EPOCH_IN_MICROSECS10    116444736000000000ULL
#endif

#if defined(_MSC_VER)
struct timeval {
        long    tv_sec;         /* seconds */
        long    tv_usec;        /* and microseconds */
};

struct timezone
  {
    int tz_minuteswest;        /* Minutes west of GMT.  */
    int tz_dsttime;        /* Nonzero if DST is ever in effect.  */
  };

int gettimeofday(struct timeval *tv, struct timezone *tz)
{
    FILETIME ft;
    unsigned __int64 tmpres = 0;
    static int tzflag = 0;

    if (NULL != tv)
    {
        GetSystemTimeAsFileTime(&ft);

        tmpres |= ft.dwHighDateTime;
        tmpres <<= 32;
        tmpres |= ft.dwLowDateTime;

        tmpres /= 10;  /*convert into microseconds*/
        /*converting file time to unix epoch*/
        tmpres -= DELTA_EPOCH_IN_MICROSECS;
        tv->tv_sec = (long)(tmpres / 1000000UL);
        tv->tv_usec = (long)(tmpres % 1000000UL);
    }

    if (NULL != tz)
    {
        if (!tzflag)
        {
            _tzset();
            tzflag++;
        }
        tz->tz_minuteswest = _timezone / 60;
        tz->tz_dsttime = _daylight;
    }

    return 0;
}

/* ------------------------------------------------------------------------- */
char *GetWin32ErrMsg(DWORD GLE)
{
    LPVOID buf;
    LPVOID lpMsgBuf = (LPVOID)"unknown Windows error";
    char *szMessage = buf;

    if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
                      FORMAT_MESSAGE_IGNORE_INSERTS |
                      FORMAT_MESSAGE_FROM_SYSTEM |
                      FORMAT_MESSAGE_MAX_WIDTH_MASK,
        NULL,
        GLE,
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        (LPTSTR) &buf,
        0,
        NULL))
    {
        int  n = strlen(szMessage);
          if(n > 0 && szMessage[n-1] == '\n') szMessage[--n] = 0;
          if(n > 0 && szMessage[n-1] == '\r') szMessage[--n] = 0;
          if(n > 0 && szMessage[n-1] == '\n') szMessage[--n] = 0;
        LocalFree(buf);
        return buf;
    }
    return lpMsgBuf;
}
#endif

/* ------------------------------------------------------------------------- */
void *strmovcpy(void *dest, const void *src, size_t n)
/*
   字符串拷贝: 将 src 的 n 个字节复制到 dest, 且在dest 之后加上 NUL
   返回 dest
*/
{
  memmove(dest, src, n);
  *((char *)dest + n) = '\0';
  return dest;
}

#ifdef _WIN32
unsigned __stdcall readf_thread_func(void *threadarg)
#else
void *readf_thread_func(void *threadarg)
#endif
{
    struct thread_data *my_data;
    long taskid;
    int  rc = 0, read_ret;
    int  i, k, n;
    char *rem_buf = NULL;
    char *p, *p1, *p2, *p3;
    int  rem_len = 0;
    char tmpbuf[BUF_LEN+1];

    my_data = (struct thread_data *) threadarg;
    taskid = my_data->thread_id;
    DBGPRT1("@readf_thread_func(), taskid=%ld\n", taskid);

    for(n = 1; ; n++)
    {
        DBGPRT1("@readf_thread_func(): thread %ld locking mutex.\n", taskid);
#ifdef _WIN32
#else
        pthread_mutex_lock(&bufi_mutex);
#endif
        while (1)
        {
#ifdef _WIN32
        EnterCriticalSection( &g_cs_bufi );
#else
#endif
            for(i = 0, k = -1; i < giTN; i++)
            {
//                if(giReadBufStat[i] == 0)
                if(gtReadBuf[i].bufstat == 0)
                {
                    k = i;
                    break;
                }
            }
#ifdef _WIN32
            if(k >= 0)
            {
//                giReadBufStat[k] = 1;
//                giFileBlockNo[k] = n;
                gtReadBuf[k].bufstat = 1;
                gtReadBuf[k].fileBlockNo = n;
                LeaveCriticalSection( &g_cs_bufi );
                break;
            }
            LeaveCriticalSection( &g_cs_bufi );
#else
            if(k >= 0) break;
#endif
#ifdef _WIN32
            DBGPRT1("=== readf_thread_func(): thread %ld, before WaitForSingleObject...\n", taskid);
            WaitForSingleObject( hEvent_bufi, INFINITE );
            DBGPRT1("*** readf_thread_func(): thread %ld, after WaitForSingleObject.\n", taskid);
#else
            DBGPRT1("=== readf_thread_func(): thread %ld, Going into wait...\n", taskid);
            pthread_cond_wait(&bufi_threshold_cv, &bufi_mutex);
            DBGPRT1("*** readf_thread_func(): thread %ld Condition signal received.\n", taskid);
#endif
        }
        DBGPRT1("@readf_thread_func(): thread %ld Reading file, k=%d ...\n", taskid, k);

#ifdef _WIN32
#else
//        giReadBufStat[k] = 1;
//        giFileBlockNo[k] = n;
        gtReadBuf[k].bufstat = 1;
        gtReadBuf[k].fileBlockNo = n;
        pthread_mutex_unlock(&bufi_mutex);
#endif
        DBGPRT1("@readf_thread_func(): thread %ld Unlocking mutex.\n", taskid);

//        p1 = bufi[k];
        p1 = gtReadBuf[k].bufptr;
//        if(rem_len > 0) memmove(bufi[k], rem_buf, rem_len);
        if(rem_len > 0) memmove(p1, rem_buf, rem_len);
#ifdef _WIN32
//        read_ret = _read(my_data->fd, bufi[k] + rem_len, buf_len - rem_len);
        read_ret = _read(my_data->fd, p1 + rem_len, buf_len - rem_len);
#else
//        read_ret = read(my_data->fd, bufi[k] + rem_len, buf_len - rem_len);
        read_ret = read(my_data->fd, p1 + rem_len, buf_len - rem_len);
#endif
        DBGPRT1("@readf_thread_func(): thread %ld read_ret1 = %d\n", taskid, read_ret);
        if(read_ret == 0)
        {
            eof_err = 1;
//            giReadBufStat[k] = 0;
            gtReadBuf[k].bufstat = 0;
            goto end;
        }
        else if(read_ret == -1)
        {
            eof_err = -1;
//            giReadBufStat[k] = 0;
            gtReadBuf[k].bufstat = 0;
            rc = -1;
            goto end;
        }
        //p = bufi[k] + rem_len;
        //p[read_ret] = 0x00;
        //strmovcpy(tmpbuf, p1, read_ret + rem_len);
        //printf("*** buf_len=%d, read_ret=%d, rem_len=%d, p1=[%s]\n", buf_len, read_ret, rem_len, tmpbuf);
        printf("*** buf_len=%d, read_ret=%d, rem_len=%d\n", buf_len, read_ret, rem_len);
        if(read_ret == buf_len - rem_len)
        {
            p = p3 = p1 + buf_len - 1;
            while(*p != '\n')
            {
                if(p3 - p >= FIND_BACK)
                {
                    eof_err = 2;
                    goto end;
                }
                p--;
            }
            p2 = p;
            read_ret = p2 + 1 - p1;
            rem_buf = p2 + 1;
            rem_len = buf_len - read_ret;
            //strmovcpy(tmpbuf, rem_buf, rem_len);
            //printf("=== buf_len=%d, read_ret=%d, rem_len=%d, rem_buf=[%s]\n", buf_len, read_ret, rem_len, tmpbuf);
            printf("=== buf_len=%d, read_ret=%d, rem_len=%d\n", buf_len, read_ret, rem_len);
        }
        else
        {
            read_ret += rem_len;
            rem_buf = NULL;
            rem_len = 0;
        }
//        bufilen[k] = read_ret;
        gtReadBuf[k].buflen = read_ret;
        DBGPRT1("--- @readf_thread_func(): thread %ld, block no = #%d, k=%d, len(read_ret)=%d\n", taskid, n, k, read_ret);
        
//        giReadBufStat[k] = 2;
        gtReadBuf[k].bufstat = 2;
#ifdef _WIN32
        SetEvent(hEvent_r[k]);
#else
        pthread_cond_signal(&g_condv_r[k]);
#endif
end:
        if(eof_err != 0) break;
    }

    DBGPRT1("@readf_thread_func(): thread %ld End, rc = %d.\n", taskid, rc);
    my_data->err = rc;
    rtf_exit = 1;
    giRFBN = n;
#ifdef _WIN32
    SetEvent(hEvent_bufo);
    for(i = 0; i < giTN; i++)
    {
        SetEvent(hEvent_r[i]);
    }
    _endthreadex( 0 );
    return 0;
#else
    pthread_cond_signal(&bufo_threshold_cv);
    for(i = 0; i < giTN; i++)
    {
        pthread_cond_signal(&g_condv_r[i]);
    }
    pthread_exit(NULL);
#endif
}

#ifdef _WIN32
unsigned __stdcall proc_thread_func(void *threadarg)
#else
void *proc_thread_func(void *threadarg)
#endif
{
    struct thread_data *my_data;
    long taskid;
    int rc = 0, i = 0, k;

    my_data = (struct thread_data *) threadarg;
    taskid = my_data->thread_id;
    DBGPRT2("@proc_thread_func(), taskid=%ld\n", taskid);

    while(1)
    {
#ifdef _WIN32
//        if((rtf_exit == 1 || eof_err != 0) && giReadBufStat[taskid] == 0) goto end;
        if((rtf_exit == 1 || eof_err != 0) && gtReadBuf[taskid].bufstat == 0) goto end;
//        if(giReadBufStat[taskid] != 2)
        if(gtReadBuf[taskid].bufstat != 2)
        {
            DBGPRT2("@proc_thread_func(): thread %ld, before WaitForSingleObject(hEvent_r).\n", taskid);
            WaitForSingleObject( hEvent_r[taskid], INFINITE );
            DBGPRT2("@proc_thread_func(): thread %ld, after WaitForSingleObject(hEvent_r).\n", taskid);
        }
//        if(giWriteBufStat[taskid] != 0)
        if(gtWriteBuf[taskid].bufstat != 0)
        {
            DBGPRT2("@proc_thread_func(): thread %ld, before WaitForSingleObject(hEvent_w).\n", taskid);
            WaitForSingleObject( hEvent_w[taskid], INFINITE );
            DBGPRT2("@proc_thread_func(): thread %ld, after WaitForSingleObject(hEvent_w).\n", taskid);
        }
#else
        DBGPRT2("@proc_thread_func(): thread %ld locking mutex bufi_mutex. giReadBufStat[taskid]=%d\n", taskid, giReadBufStat[taskid]);
        //pthread_mutex_lock(&bufi_mutex);
        pthread_mutex_lock(&g_mutex_r[taskid]);
//        while (giReadBufStat[taskid] != 2)
        while (gtReadBuf[taskid].bufstat != 2)
        {
//            if((rtf_exit == 1 || eof_err != 0) && giReadBufStat[taskid] == 0) goto end;
            if((rtf_exit == 1 || eof_err != 0) && gtReadBuf[taskid].bufstat == 0) goto end;
//            DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufi. giReadBufStat[taskid]=%d, rtf_exit = %d, eof_err = %d...\n", taskid, giReadBufStat[taskid], rtf_exit, eof_err);
            DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufi. gtReadBuf[taskid].bufstat=%d, rtf_exit = %d, eof_err = %d...\n", taskid, gtReadBuf[taskid].bufstat, rtf_exit, eof_err);
//            pthread_cond_wait(&g_condv_r[taskid], &bufi_mutex);
            pthread_cond_wait(&g_condv_r[taskid], &g_mutex_r[taskid]);
            DBGPRT2("*** proc_thread_func(): thread %ld Condition signal received. \n", taskid);
        }
        DBGPRT2("@proc_thread_func(): thread %ld locking mutex bufo_mutex.\n", taskid);
        //pthread_mutex_lock(&bufo_mutex);
        pthread_mutex_lock(&g_mutex_w[taskid]);
//        while (giWriteBufStat[taskid] != 0)
        while (gtWriteBuf[taskid].bufstat != 0)
        {
            DBGPRT2("=== proc_thread_func(): thread %ld, Going into wait bufo...\n", taskid);
//            pthread_cond_wait(&g_condv_w[taskid], &bufo_mutex);
            pthread_cond_wait(&g_condv_w[taskid], &g_mutex_w[taskid]);
            DBGPRT2("*** proc_thread_func(): thread %ld Condition signal received.\n", taskid);
        }
        DBGPRT2("@proc_thread_func(): thread %ld Unlocking mutex.\n", taskid);
//        pthread_mutex_unlock(&bufo_mutex);
//        pthread_mutex_unlock(&bufi_mutex);
        pthread_mutex_unlock(&g_mutex_w[taskid]);
        pthread_mutex_unlock(&g_mutex_r[taskid]);
#endif
        
//        DBGPRT2("@proc_thread_func(): thread %ld Processing file, giReadBufStat[taskid]=%d, giWriteBufStat[taskid]=%d ...\n", taskid, giReadBufStat[taskid], giWriteBufStat[taskid]);
        DBGPRT2("@proc_thread_func(): thread %ld Processing file, gtReadBuf[taskid].bufstat=%d, gtWriteBuf[taskid].bufstat=%d ...\n", taskid, gtReadBuf[taskid].bufstat, gtWriteBuf[taskid].bufstat);
        //i = 0;
        //if(giReadBufStat[taskid] == 2 && giWriteBufStat[taskid] == 0)
        {
//            giReadBufStat[taskid] = 3;
//            giWriteBufStat[taskid] = 3;
            gtReadBuf[taskid].bufstat = 3;
            gtWriteBuf[taskid].bufstat = 3;
#ifdef _WIN32
            //Sleep(random_1(5,20) * 1000);
#else
            //sleep(random_1(5,20));
#endif
//            for(i = 0; i < bufilen[taskid]; i++)
            for(i = 0; i < gtReadBuf[taskid].buflen; i++)
            {
//                bufo[taskid][i] = toupper(bufi[taskid][i]);
                gtWriteBuf[taskid].bufptr[i] = toupper(gtReadBuf[taskid].bufptr[i]);
            }
//            bufolen[taskid] = bufilen[taskid];
            gtWriteBuf[taskid].buflen = gtReadBuf[taskid].buflen;

//            giReadBufStat[taskid] = 0;
//            giWriteBufStat[taskid] = 4;
            gtReadBuf[taskid].bufstat = 0;
            gtWriteBuf[taskid].bufstat = 4;
            gtWriteBuf[taskid].fileBlockNo = gtReadBuf[taskid].fileBlockNo;
#ifdef _WIN32
            DBGPRT2("@proc_thread_func(): thread %ld SetEvent ...\n", taskid);
            SetEvent(hEvent_bufi);
            SetEvent(hEvent_bufo);
#else
            DBGPRT2("@proc_thread_func(): thread %ld pthread_cond_signal ...\n", taskid);
            pthread_cond_signal(&bufi_threshold_cv);
            pthread_cond_signal(&bufo_threshold_cv);
#endif
        }
//        if(eof_err != 0 && buf1istat != 1 && buf2istat != 1) break;
    }
//    printf("proc_thread_func(): thread %ld End, rc = %d, proc_len = %ld.\n", taskid, rc, proc_len);
end:
    DBGPRT2("proc_thread_func(): thread %ld End, rc = %d\n", taskid, rc);
    my_data->err = rc;
    ptf_exit = 1;
#ifdef _WIN32
    _endthreadex( 0 );
    return 0;
#else
    pthread_exit(NULL);
#endif
}

#ifdef _WIN32
unsigned __stdcall writef_thread_func(void *threadarg)
#else
void *writef_thread_func(void *threadarg)
#endif
{
    struct thread_data *my_data;
    long taskid;
    int  rc = 0, i = 0, k, iWFBN = 1;
    size_t len;
#ifdef _WIN32
    __int64 write_len = 0;
    int  write_ret1;
#else
    off_t write_len = 0;
    ssize_t write_ret1;
#endif

    my_data = (struct thread_data *) threadarg;
    taskid = my_data->thread_id;
    DBGPRT3("@writef_thread_func(), taskid=%ld, fd=%d\n", taskid, my_data->fd);

    while(1)
    {
#ifdef _WIN32
#else
        DBGPRT3("@writef_thread_func(): thread %ld locking mutex.\n", taskid);
        pthread_mutex_lock(&bufo_mutex);
#endif
        while (1)
        {
#ifdef _WIN32
        EnterCriticalSection( &g_cs_bufo );
#else
#endif
            for(i = 0, k = -1; i < giTN; i++)
            {
//                DBGPRT3("@writef_thread_func(): thread %ld, giWriteBufStat[%d]=%d, giFileBlockNo[%d]=%d\n", taskid, i, giWriteBufStat[i], i, giFileBlockNo[i]);
                DBGPRT3("@writef_thread_func(): thread %ld, gtWriteBuf[%d].bufstat=%d, gtWriteBuf[%d].fileBlockNo=%d\n", taskid, i, gtWriteBuf[i].bufstat, i, gtWriteBuf[i].fileBlockNo);
//                if(giWriteBufStat[i] == 4 && giFileBlockNo[i] == iWFBN)
                if(gtWriteBuf[i].bufstat == 4 && gtWriteBuf[i].fileBlockNo == iWFBN)
                {
                    k = i;
                    break;
                }
            }
#ifdef _WIN32
            if(k >= 0)
            {
//                giWriteBufStat[k] = 5;
                gtWriteBuf[k].bufstat = 5;
                LeaveCriticalSection( &g_cs_bufo );
                break;
            }
            LeaveCriticalSection( &g_cs_bufo );
#else
            if(k >= 0) break;
#endif
            DBGPRT3("=== writef_thread_func(): thread %ld, Going into wait, giRFBN=%d, iWFBN=%d ...\n", taskid, giRFBN, iWFBN);
            if((rtf_exit == 1 || eof_err != 0) && giRFBN == iWFBN) break;
#ifdef _WIN32
            DBGPRT3("=== writef_thread_func(): thread %ld, before WaitForSingleObject...\n", taskid);
            WaitForSingleObject( hEvent_bufo, INFINITE );
            DBGPRT3("*** writef_thread_func(): thread %ld, after WaitForSingleObject.\n", taskid);
#else
            pthread_cond_wait(&bufo_threshold_cv, &bufo_mutex);
            DBGPRT3("*** writef_thread_func(): thread %ld Condition signal received.\n", taskid);
#endif
        }
#ifdef _WIN32
#else
//        giWriteBufStat[k] = 5;
        gtWriteBuf[k].bufstat = 5;
        pthread_mutex_unlock(&bufo_mutex);
#endif
//        len = bufolen[k];
        len = gtWriteBuf[k].buflen;
        DBGPRT3("@writef_thread_func(): thread %ld Writing file, k=%d, len=%d ...\n", taskid, k, len);
#ifdef _WIN32
//        write_ret1 = _write(my_data->fd, bufo[k], len);
        write_ret1 = _write(my_data->fd, gtWriteBuf[k].bufptr, len);
#else
//        write_ret1 = write(my_data->fd, bufo[k], len);
        write_ret1 = write(my_data->fd, gtWriteBuf[k].bufptr, len);
#endif
        write_len += write_ret1;
        if(write_ret1 != len)
        {
#ifdef _WIN32
            printf("Error: _write, GetLastError()=%d, errno=%d(%s)\n", GetLastError(), errno, strerror(errno));
#else
            printf("Error: write失败, errno=%d(%s)\n", errno, strerror(errno));
#endif
            rc = -1;
            goto end;
        }
#ifdef _WIN32
        _commit(my_data->fd);
#endif
//        giWriteBufStat[k] = 0;
        gtWriteBuf[k].bufstat = 0;

//        giWriteBufStat[giWriteBufIdx % giTN] = 0;
//        DBGPRT3("@writef_thread_func(): thread %ld pthread_cond_signal, giWriteBufIdx=%d ...\n", taskid, giWriteBufIdx);
//        pthread_cond_signal(&g_condv_w[giWriteBufIdx % giTN]);
#ifdef _WIN32
        SetEvent(hEvent_w[k]);
#else
        DBGPRT3("@writef_thread_func(): thread %ld pthread_cond_signal, write_ret1=%d ...\n", taskid, write_ret1);
        pthread_cond_signal(&g_condv_w[k]);
#endif
end:
//        DBGPRT3("writef_thread_func(): thread %ld Unlocking mutex. buf1ostat = %d, buf2ostat = %d, rc = %d, eof_err = %d, rtf_exit = %d, ptf_exit = %d\n", taskid, buf1ostat, buf2ostat, rc, eof_err, rtf_exit, ptf_exit);
        //pthread_mutex_unlock(&bufo_mutex);
        if(rc < 0) break;
        //if(buf1ostat == 2 || buf2ostat == 2) continue;
        //if(eof_err != 0) break;
//        if(rtf_exit != 0 && ptf_exit != 0) break;
        if((rtf_exit == 1 || eof_err != 0) && giRFBN == iWFBN) break;
        iWFBN++;
    }
    DBGPRT3("writef_thread_func(): thread %ld End, rc = %d, write_len = %ld.\n", taskid, rc, write_len);
    wtf_exit = 1;
    for(i = 0; i < giTN; i++)
    {
#ifdef _WIN32
        SetEvent(hEvent_r[i]);
#else
        pthread_cond_signal(&g_condv_r[i]);
#endif
    }
    my_data->err = rc;
#ifdef _WIN32
    _endthreadex( 0 );
    return 0;
#else
    pthread_exit(NULL);
#endif
}

int  main(int argc, char *argv[])
{
#ifdef _WIN32
    HANDLE  hThread[MAX_PT_NUM];
    unsigned  uiThreadID[MAX_PT_NUM];
#else
  pthread_t threads[MAX_PT_NUM];
#endif
  int  rc, *taskids[MAX_PT_NUM];
  int  i, t = 0;
  int  fd1, fd2;
    struct timeval begin, current;
    double elapsed;

    if(argc < 3)
    {
        printf("多线程练习程序,将输入文件每行变大写,存为输出文件\n");
        printf("用法:%s 输入文件名 输出文件名 [maxBufLen(MB) [maxThreadNum]]\n", argv[0]);
        return 1;
    }
    if(argc >= 4) buf_len = atoi(argv[3]) * 1024 * 1024L;
    if(argc >= 5) giTN = atoi(argv[4]);
    gettimeofday(&begin, 0);
#ifdef _WIN32
    if((fd1 = _open(argv[1], _O_BINARY)) == -1)
#else
    if((fd1 = open64(argv[1], O_RDONLY | O_LARGEFILE)) == -1)
#endif
    {
        printf("Open file error!\n");
        return -1;
    }
#ifdef _WIN32
    if((fd2 = _open(argv[2], _O_BINARY | _O_WRONLY | _O_CREAT, _S_IREAD | _S_IWRITE)) == -1)
#else
    if((fd2 = open64(argv[2], O_CREAT | O_WRONLY | O_TRUNC | O_LARGEFILE,
                     S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) == -1)
#endif
    {
        printf("Create file error!\n");
#ifdef _WIN32
        _close(fd1);
#else
        close(fd1);
#endif
        return -2;
    }
    DBGPRT("--- fd1=%d, fd2=%d\n", fd1, fd2);
  srand(time(0));
/*
  memset(giReadBufStat, 0x00, sizeof(giReadBufStat));
  memset(giWriteBufStat, 0x00, sizeof(giWriteBufStat));
  memset(giFileBlockNo, 0x00, sizeof(giFileBlockNo));
*/
  memset(gtReadBuf, 0x00, sizeof(gtReadBuf));
  memset(gtWriteBuf, 0x00, sizeof(gtWriteBuf));
  
#ifdef _WIN32
    InitializeCriticalSection( &g_cs_bufi );
    InitializeCriticalSection( &g_cs_bufo );
    hEvent_bufi = CreateEvent( NULL, FALSE, FALSE, NULL );
    hEvent_bufo = CreateEvent( NULL, FALSE, FALSE, NULL );
#else
#endif

    for(i = 0; i < giTN; i++)
    {
//        if((bufi[i] = (char *)malloc(buf_len + 1)) == NULL)
        if((gtReadBuf[i].bufptr = (char *)malloc(buf_len + 1)) == NULL)
        {
            printf("Insufficient memory available for bufi[%d]\n", i);
            return -1;
        }
//        if((bufo[i] = (char *)malloc(buf_len + 1)) == NULL)
        if((gtWriteBuf[i].bufptr = (char *)malloc(buf_len + 1)) == NULL)
        {
            printf("Insufficient memory available for bufo[%d]\n", i);
            return -1;
        }
#ifdef _WIN32
    hEvent_r[i] = CreateEvent( NULL, FALSE, FALSE, NULL );
    hEvent_w[i] = CreateEvent( NULL, FALSE, FALSE, NULL );
#else
/*
        g_mutex_r[i] = PTHREAD_MUTEX_INITIALIZER;
        g_mutex_w[i] = PTHREAD_MUTEX_INITIALIZER;
        g_condv_r[i] = PTHREAD_COND_INITIALIZER;
        g_condv_w[i] = PTHREAD_COND_INITIALIZER;
*/
        pthread_mutex_init(&g_mutex_r[i], NULL);
        pthread_mutex_init(&g_mutex_w[i], NULL);
        pthread_cond_init (&g_condv_r[i], NULL);
        pthread_cond_init (&g_condv_w[i], NULL);
#endif
    }

    t = giTN+1;
    thread_data_array[t].thread_id = t;
    thread_data_array[t].err = 0;
    thread_data_array[t].fd = fd2;
    DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
    hThread[t] = (HANDLE)_beginthreadex( NULL,    // security  
                                   0,        // stack size  
                                   &writef_thread_func,  
                                   (void *) &thread_data_array[t],    // arg list  
                                   0,        //initflag
                                   &uiThreadID[t] );  
    if ( hThread[t] == 0 )
    {
        printf("ERROR: Failed to create thread: _beginthreadex(writef_thread_func)\n");
        exit(-1);
    }
#else
    rc = pthread_create(&threads[t], NULL, writef_thread_func, (void *) &thread_data_array[t]);
    if (rc)
    {
        printf("ERROR: return code from pthread_create(writef_thread_func) is %d\n", rc);
        exit(-1);
    }
#endif

    for(t = 0; t < giTN; t++)
    {
        thread_data_array[t].thread_id = t;
        thread_data_array[t].err = 0;
        thread_data_array[t].fd = 0;
        DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
    hThread[t] = (HANDLE)_beginthreadex( NULL,    // security  
                                   0,        // stack size  
                                   &proc_thread_func,  
                                   (void *) &thread_data_array[t],    // arg list  
                                   0,        //initflag
                                   &uiThreadID[t] );  
    if ( hThread[t] == 0 )
    {
        printf("ERROR: Failed to create thread: _beginthreadex(proc_thread_func)\n");
        exit(-1);
    }
#else
        rc = pthread_create(&threads[t], NULL, proc_thread_func, (void *) &thread_data_array[t]);
        if (rc)
        {
            printf("ERROR: return code from pthread_create(proc_thread_func) is %d\n", rc);
            exit(-1);
        }
#endif
    }
    //printf("t= %d\n", t);

    thread_data_array[t].thread_id = t;
    thread_data_array[t].err = 0;
    thread_data_array[t].fd = fd1;
    DBGPRT("Creating thread %d\n", t);
#ifdef _WIN32
    hThread[t] = (HANDLE)_beginthreadex( NULL,    // security  
                                   0,        // stack size  
                                   &readf_thread_func,  
                                   (void *) &thread_data_array[t],    // arg list  
                                   0,        //initflag
                                   &uiThreadID[t] );  
    if ( hThread[t] == 0 )
    {
        printf("ERROR: Failed to create thread: _beginthreadex(readf_thread_func)\n");
        exit(-1);
    }
#else
    rc = pthread_create(&threads[t], NULL, readf_thread_func, (void *) &thread_data_array[t]);
    if (rc)
    {
        printf("ERROR: return code from pthread_create(readf_thread_func) is %d\n", rc);
        exit(-1);
    }
#endif

    /* Wait for all threads to complete */
#ifdef _WIN32
    for (i = 0; i < giTN+2; i++)
    {
        DBGPRT("--- WaitForSingleObject(hThread[%d])...\n", i);
        WaitForSingleObject( hThread[i], INFINITE );
    }
#else
    DBGPRT("--- pthread_join()...\n");
    for (i = 0; i < giTN+2; i++)
    {
        pthread_join(threads[i], NULL);
        //pthread_join(threads[i], (void **)&ret_val);
        //printf("i = %d, pthread_join retval = %d\n", i, *(int *) retval);
        //printf("i = %d, pthread_join retval = %d\n", i, (int)ret_val);
        DBGPRT("=== After pthread_join(), i = %d, err = %d\n", i, thread_data_array[i].err);
    }
#endif

    /* Clean up and exit */
#ifdef _WIN32
    DeleteCriticalSection( &g_cs_bufi );
    DeleteCriticalSection( &g_cs_bufo );

#else
    for(i = 0; i < giTN; i++)
    {
        pthread_mutex_destroy(&g_mutex_r[i]);
        pthread_mutex_destroy(&g_mutex_w[i]);
        pthread_cond_destroy(&g_condv_r[i]);
        pthread_cond_destroy(&g_condv_w[i]);
    }
    pthread_mutex_destroy(&bufi_mutex);
    pthread_mutex_destroy(&bufo_mutex);
    pthread_cond_destroy(&bufi_threshold_cv);
    pthread_cond_destroy(&bufo_threshold_cv);
#endif

    gettimeofday(&current, (struct timezone*) 0);
    elapsed = (current.tv_sec - begin.tv_sec) +
              ((current.tv_usec - begin.tv_usec) / 1000000.0F);
    printf("--- 耗时 %f 秒\n", elapsed);
#ifdef _WIN32
    // Destroy the thread object.
    for (i = 0; i < giTN+2; i++)
    {
        CloseHandle( hThread[i] );
    }
#else
    pthread_exit (NULL);
#endif
    return 0;
}
2019-06-27 14:51
NB2011
Rank: 1
等 级:新手上路
帖 子:8
专家分:5
注 册:2011-7-17
收藏
得分:0 
我看着那些生产者、消费者问题的文章,还是不知道该如何解决我这个实际问题 。
各位高手请拿我的程序当靶子,狠狠批评并指正!只有这样我觉得才能学到真东西。谢谢!

NewBie
2019-06-27 18:00
快速回复:请教VC6 /LINUX GCC 多线程处理文本文件的问题
数据加载中...
 
   



关于我们 | 广告合作 | 编程中国 | 清除Cookies | TOP | 手机版

编程中国 版权所有,并保留所有权利。
Powered by Discuz, Processed in 0.152472 second(s), 9 queries.
Copyright©2004-2024, BCCN.NET, All Rights Reserved