Sources/aliyun-log-c-sdk/include/log_multi_thread.h (244 lines of code) (raw):

#ifndef LOG_MULTI_THREAD_UTIL_H #define LOG_MULTI_THREAD_UTIL_H #include "log_inner_include.h" //不同操作系统资源相关的工具宏定义 #ifdef WIN32 //临界区资源 #define CRITICALSECTION LPCRITICAL_SECTION #define INVALID_CRITSECT NULL /** ******************************************************************** * 创建互斥锁 ******************************************************************** */ static inline CRITICALSECTION CreateCriticalSection() { CRITICALSECTION cs = (CRITICALSECTION)malloc(sizeof(RTL_CRITICAL_SECTION)); InitializeCriticalSection(cs); return cs; } /** ******************************************************************** * 删除互斥锁 ******************************************************************** */ static inline void ReleaseCriticalSection(CRITICALSECTION cs) { if (cs != INVALID_CRITSECT) { DeleteCriticalSection(cs); free(cs); } } /// * @brief 加锁 #define CS_ENTER(cs) EnterCriticalSection(cs) /// * @brief 解锁 #define CS_LEAVE(cs) LeaveCriticalSection(cs) /// * @brief 互斥锁 #define MUTEX CRITICAL_SECTION /// * @brief 加锁 #define MUTEX_LOCK(mutex) EnterCriticalSection(&mutex) /// * @brief 解锁 #define MUTEX_UNLOCK(mutex) LeaveCriticalSection(&mutex) /// * @brief 互斥锁初始化 #define MUTEX_INIT(mutex) InitializeCriticalSection(&mutex) /// * @brief 互斥锁销毁 #define MUTEX_DESTROY(mutex) ReleaseCriticalSection(&mutex) //信号量资源 /// * @brief 信号量 typedef HANDLE SEMA; /// * @brief 等待信号量一定时间 #define SEMA_WAIT_TIME(sema, delay) WaitForSingleObject(sema, delay) /// * @brief 一直阻塞地进行等待信号量 #define SEMA_WAIT(sema) WaitForSingleObject(sema, INFINITE) /// * @brief 释放信号量 #define SEMA_POST(sema) ReleaseSemaphore(sema, 1, NULL) /// * @brief 尝试获取一个信号量 #define SEMA_TRYWAIT(sema) WaitForSingleObject(sema, 0) /// * @brief 销毁信号量 #define SEMA_DESTROY(sema) CloseHandle(sema) /// * @brief 初始化信号量, 输入的为:信号量的最大值,初始信号量个数 #define SEMA_INIT(sema, initCount, maxCount) sema = CreateSemaphore(NULL, initCount, maxCount, NULL) /// * @brief 初始一个带有名称的信号量,用于多进程交互 #define SEMA_INIT_NAME(sema, initCount, maxCount, semaName) sema = CreateSemaphore(NULL, initCount, maxCount, semaName) /// * @brief 信号量等待超时 #define SEMA_WAIT_TIMEOUT WAIT_TIMEOUT /// * @brief 等待到信号量 #define SEMA_WAIT_OK WAIT_OBJECT_0 //条件量 typedef struct windows_event{ HANDLE event; }*COND; //typedef PRTL_CONDITION_VARIABLE COND; typedef int COND_WAIT_T; #define COND_WAIT_OK 0 #define COND_WAIT_TIMEOUT ETIMEDOUT #define INVALID_COND NULL static inline COND CreateCond() { COND cond = NULL; if (!(cond = (COND)malloc(sizeof(struct windows_event)))) return cond; if ((cond->event = CreateEvent(NULL, FALSE, FALSE, NULL)) == NULL) { free(cond); return NULL; } return cond; } static inline void DeleteCond(COND cond) { if (cond != INVALID_COND) { CloseHandle(cond->event); free(cond); } } #define COND_SIGNAL(cond) COND_WAKE(cond) static inline COND_WAIT_T COND_WAIT_TIME(COND cond, CRITICALSECTION cs, int32_t waitMs) { if (cond == INVALID_COND) { return EINVAL; } DWORD ret; DWORD startTime, endTime, totalWaitMs, remainMs; int result = -1; LeaveCriticalSection(cs); ret = WaitForSingleObject((HANDLE)cond->event, waitMs); if (ret == WAIT_TIMEOUT) { result = ETIMEDOUT; } else if (ret != WAIT_OBJECT_0) { result = EINVAL; } EnterCriticalSection(cs); return result; } static inline COND_WAIT_T COND_WAKE(COND cond) { if (cond == INVALID_COND) { return EINVAL; } if (!SetEvent((HANDLE)cond->event)) return EINVAL; return 0; } typedef HANDLE THREAD; static inline void Win32CreateThread(HANDLE* hpThread, _In_ LPTHREAD_START_ROUTINE lpStartAddress, _In_opt_ __drv_aliasesMem LPVOID lpParameter) { *hpThread = CreateThread(NULL, 0, lpStartAddress, lpParameter, 0, NULL); } #define THREAD_INIT(thread, func, param) Win32CreateThread(&thread, func, param) #define THREAD_JOIN(thread) WaitForSingleObject(thread, INFINITE) #define snprintf sprintf_s #define ATOMICINT volatile long #define ATOMICINT_INC(pAtopicInt) InterlockedIncrement(pAtopicInt) #define ATOMICINT_DEC(pAtopicInt) InterlockedDecrement(pAtopicInt) #define ATOMICINT_ADD(pAtopicInt, addVal) InterlockedAdd(pAtopicInt, addVal) #define ATOMICINT_EXCHANGEADD(pAtopicInt, addVal) InterlockedExchangeAdd(pAtopicInt, addVal) #define ATOMICINT_EXCHANGE(pAtopicInt, exchangeVal) InterlockedExchange(pAtopicInt, exchangeVal) #define ATOMICINT_COMPAREEXCAHNGE(pAtopicInt, exchangeVal, cmpVal) InterlockedCompareExchange(pAtopicInt, exchangeVal, cmpVal) #elif defined(_VXWORKS) //临界区资源 typedef SEM_ID CRITICALSECTION; #define INVALID_CRITSECT NULL static inline CRITICALSECTION CreateCriticalSection(int spinCount = 0) { CRITICALSECTION cs = semMCreate(SEM_Q_PRIORITY | SEM_DELETE_SAFE | SEM_INVERSION_SAFE); if (cs == NULL) { perror("vxworks create MUTUAL EXCLUSION SEMAPHORE failed\n"); } return cs; } static inline void ReleaseCriticalSection(CRITICALSECTION & cs) { semDelete(cs); cs = INVALID_CRITSECT; } #define CS_ENTER(cs) semTake(cs, WAIT_FOREVER) #define CS_LEAVE(cs) semGive(cs) #define MUTEX SEM_ID #define MUTEX_LOCK(mutex) semTake(mutex, WAIT_FOREVER) #define MUTEX_UNLOCK(mutex) semGive(mutex) #define MUTEX_INIT(mutex) mutex = semBCreate(SEM_Q_FIFO,SEM_FULL) #define MUTEX_DESTROY(mutex) semDelete(mutex) //信号量资源 #define SEMA SEM_ID #define SEMA_WAIT_TIME(sema,delay) semTake(sema, delay) #define SEMA_WAIT(sema) semTake(sema, WAIT_FOREVER) #define SEMA_POST(sema) semGive(sema) #define SEMA_DESTROY(sema) semDelete(sema) #define SEMA_INIT(sema, initCount, maxCount) sema = semCCreate(SEM_Q_FIFO,initCount) #define SEMA_WAIT_TIMEOUT ERROR //线程资源 #define THREADID int #define SOCKET int #define closesocket(s_) close(s_) #define SOCKET_ERROR -1 #else //临界区资源 typedef pthread_mutex_t* CRITICALSECTION; #define INVALID_CRITSECT NULL static inline CRITICALSECTION CreateCriticalSection() { CRITICALSECTION cs = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)); assert(cs != INVALID_CRITSECT); pthread_mutex_init(cs, NULL); return cs; } static inline void ReleaseCriticalSection(CRITICALSECTION cs) { if (cs != INVALID_CRITSECT) { pthread_mutex_destroy(cs); free(cs); } } #define CS_ENTER(cs) pthread_mutex_lock(cs) #define CS_LEAVE(cs) pthread_mutex_unlock(cs) typedef pthread_cond_t* COND; typedef int COND_WAIT_T; #define COND_WAIT_OK 0 #define COND_WAIT_TIMEOUT ETIMEDOUT #define INVALID_COND NULL static inline COND CreateCond() { COND cond = (COND)malloc(sizeof(pthread_cond_t)); assert(cond != INVALID_CRITSECT); pthread_cond_init(cond, NULL); return cond; } static inline void DeleteCond(COND cond) { if (cond != INVALID_COND) { pthread_cond_destroy(cond); free(cond); } } #define COND_SIGNAL(cond) pthread_cond_signal(cond) #define COND_SIGNAL_ALL(cond) pthread_cond_broadcast(cond) static inline COND_WAIT_T COND_WAIT_TIME(COND cond, CRITICALSECTION cs, int32_t waitMs) { struct timeval now; struct timespec outTime; gettimeofday(&now, NULL); now.tv_usec += ((waitMs) % 1000) * 1000; if (now.tv_usec > 1000000) { now.tv_usec -= 1000000; ++now.tv_sec; } outTime.tv_sec = now.tv_sec + (waitMs) / 1000; outTime.tv_nsec = now.tv_usec * 1000; return pthread_cond_timedwait(cond, cs, &outTime); } static inline int64_t GET_TIME_US() { struct timeval now; gettimeofday(&now, NULL); return (int64_t)now.tv_sec * 1000000 + now.tv_usec; } #define MUTEX pthread_mutex_t #define SEMA sem_t #define MUTEX_LOCK(mutex) pthread_mutex_lock(&mutex) #define MUTEX_UNLOCK(mutex) pthread_mutex_unlock(&mutex) #define MUTEX_INIT(mutex) pthread_mutex_init(&mutex,NULL) #define MUTEX_DESTROY(mutex) pthread_mutex_destroy(&mutex) // not supported in mac os #ifdef __linux__ static inline int sema_wait_time_(sem_t* sema, unsigned int delayMs) { struct timespec ts; struct timeval tv; gettimeofday(&tv, NULL); tv.tv_usec += (delayMs % 1000) * 1000; tv.tv_sec += delayMs / 1000; if (tv.tv_usec > 1000000) { tv.tv_usec -= 1000000; ++tv.tv_sec; } ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000; return sem_timedwait(sema, &ts) == 0 ? 0 : ETIMEDOUT; } #define SEMA_WAIT_TIME(sema,delay) sema_wait_time_(&sema,delay) #endif #define SEMA_WAIT(sema) sem_wait(&sema) #define SEMA_POST(sema) sem_post(&sema) #define SEMA_TRYWAIT(sema) sem_trywait(&sema) #define SEMA_DESTROY(sema) sem_destroy(&sema) #define SEMA_INIT(sema, initCount, maxCount) sem_init(&sema,0,initCount) #define SEMA_INIT_NAME(sema, initCount, maxCount, semaName) sema = sem_open(semaName, O_CREAT, 0, initCount) #define WAIT_OBJECT_0 0 #define WAIT_TIMEOUT ETIMEDOUT #define SEMA_WAIT_TIMEOUT ETIMEDOUT #define SEMA_WAIT_OK 0 typedef pthread_t THREAD; #define THREAD_INIT(thread, func, param) pthread_create(&(thread), NULL, func, param) #define THREAD_JOIN(thread) pthread_join(thread, NULL) #define ATOMICINT volatile long #define ATOMICINT_INC(pAtopicInt) __sync_add_and_fetch(pAtopicInt, 1) #define ATOMICINT_DEC(pAtopicInt) __sync_add_and_fetch(pAtopicInt, -1) #define ATOMICINT_ADD(pAtopicInt, addVal) __sync_add_and_fetch(pAtopicInt, addVal) #define ATOMICINT_EXCHANGEADD(pAtopicInt, addVal) __sync_fetch_and_add(pAtopicInt, addVal) #define ATOMICINT_EXCHANGE(pAtopicInt, exchangeVal) __sync_val_compare_and_swap(pAtopicInt, *pAtopicInt, exchangeVal) #define ATOMICINT_COMPAREEXCAHNGE(pAtopicInt, exchangeVal, cmpVal) __sync_val_compare_and_swap(pAtopicInt, cmpVal, exchangeVal) typedef struct _FILETIME { unsigned long dwLowDateTime; unsigned long dwHighDateTime; } FILETIME; #endif //WIN32 #endif //LOG_MULTI_THREAD_UTIL_H