当前位置:首页 >> 服务器

linux多线程编程(五)

线程

  线程是计算机中独立运行的最小单位,运行时占用很少的系统资源。可以把线程看成是操作系统分配CPU时间的基本单元。一个进程可以拥有一个至多个线程。它线程在进程内部共享地址空间、打开的文件描述符等资源。同时线程也有其私有的数据信息,包括:线程号、寄存器(程序计数器和堆栈指针)、堆栈、信号掩码、优先级、线程私有存储空间。

  为什么有了进程的概念后,还要再引入线程呢?使用多线程到底有哪些好处?什么的系统应该选用多线程?

  使用多线程的理由之一是和进程相比,它是一种非常“节俭”的多任务操作方式。我们知道,在Linux系统下,启动一个新的进程必须分配给它独立的地址空间,建立众多的数据表来维护它的代码段、堆栈段和数据段,这是一种“昂贵”的多任务工作方式。而运行于一个进程中的多个线程,它们彼此之间使用相同的地址空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间,而且,线程间彼此切换所需的时间也远远小于进程间切换所需要的时间。据统计,总的说来,一个进程的开销大约是一个线程开销的30倍左右,当然,在具体的系统上,这个数据可能会有较大的区别。

  使用多线程的理由之二是线程间方便的通信机制。对不同进程来说,它们具有独立的数据空间,要进行数据的传递只能通过通信的方式进行,这种方式不仅费时,而且很不方便。线程则不然,由于同一进程下的线程之间共享数据空间,所以一个线程的数据可以直接为其它线程所用,这不仅快捷,而且方便。当然,数据的共享也带来其他一些问题,有的变量不能同时被两个线程所修改,有的子程序中声明为static的数据更有可能给多线程程序带来灾难性的打击,这些正是编写多线程程序时最需要注意的地方。

  除了以上所说的优点外,不和进程比较,多线程程序作为一种多任务、并发的工作方式,当然有以下的优点:

  1)提高应用程序响应。这对图形界面的程序尤其有意义,当一个操作耗时很长时,整个系统都会等待这个操作,此时程序不会响应键盘、鼠标、菜单的操作,而使用多线程技术,将耗时长的操作(time consuming)置于一个新的线程,可以避免这种尴尬的情况。

  2)使多CPU系统更加有效。操作系统会保证当线程数不大于CPU数目时,不同的线程运行于不同的CPU上。

  3)改善程序结构。一个既长又复杂的进程可以考虑分为多个线程,成为几个独立或半独立的运行部分,这样的程序会利于理解和修改。

  线程分类

  线程按照其调度者可以分为用户级线程和核心级线程两种。

  (1)用户级线程

  用户级线程主要解决的是上下文切换的问题,它的调度算法和调度过程全部由用户自行选择决定,在运行时不需要特定的内核支持。在这里,操作系统往往会提供一个用户空间的线程库,该线程库提供了线程的创建、调度、撤销等功能,而内核仍然仅对进程进行管理。如果一个进程中的某一个线程调用了一个阻塞的系统调用,那么该进程包括该进程中的其他所有线程也同时被阻塞。这种用户级线程的主要缺点是在一个进程中的多个线程的调度中无法发挥多处理器的优势。

  (2)核心级线程

  这种线程允许不同进程中的线程按照同一相对优先调度方法进行调度,这样就可以发挥多处理器的并发优势。

  现在大多数系统都采用用户级线程与核心级线程并存的方法。一个用户级线程可以对应一个或几个核心级线程,也就是“一对一”或“多对一”模型。这样既可满足多处理机系统的需要,也可以最大限度地减少调度开销。

  线程创建的Linux实现

  我们知道,Linux的线程实现是在核外进行的,核内提供的是创建进程的接口do_fork()。内核提供了两个系统调用clone()和fork(),最终都用不同的参数调用do_fork()核内API。当然,要想实现线程,没有核心对多进程(其实是轻量级进程)共享数据段的支持是不行的,因此,do_fork()提供了很多参数,包括CLONE_VM(共享内存空间)、CLONE_FS(共享文件系统信息)、 CLONE_FILES(共享文件描述符表)、CLONE_SIGHAND(共享信号句柄表)和CLONE_PID(共享进程ID,仅对核内进程,即0号进程有效)。当使用fork系统调用时,内核调用do_fork()不使用任何共享属性,进程拥有独立的运行环境,而使用 pthread_create()来创建线程时,则最终设置了所有这些属性来调用__clone(),而这些参数又全部传给核内的do_fork(),从而创建的“进程”拥有共享的运行环境,只有栈是独立的,由__clone()传入。

  Linux线程在核内是以轻量级进程的形式存在的,拥有独立的进程表项,而所有的创建、同步、删除等操作都在核外pthread库中进行。pthread 库使用一个管理线程(__pthread_manager(),每个进程独立且唯一)来管理线程的创建和终止,为线程分配线程ID,发送线程相关的信号(比如Cancel),而主线程(pthread_create())的调用者则通过管道将请求信息传给管理线程。

  多线程编程

  1、线程的创建和退出

  pthread_create 线程创建函数

int pthread_create (pthread_t * thread_id,__const pthread_attr_t * __attr,void *(*__start_routine) (void *),void *__restrict __arg);

  线程创建函数第一个参数为指向线程标识符的指针,第二个参数用来设置线程属性,第三个参数是线程运行函数的起始地址,最后一个参数是运行函数的参数。这里,我们的函数thread 不需要参数,所以最后一个参数设为空指针。第二个参数我们也设为空指针,这样将生成默认属性的线程。当创建线程成功时,函数返回0,若不为0 则说明创建线程失败,常见的错误返回代码为EAGAIN 和EINVAL。前者表示系统限制创建新的线程,例如线程数目过多了;后者表示第二个参数代表的线程属性值非法。创建线程成功后,新创建的线程则运行参数三和参数四确定的函数,原来的线程则继续运行下一行代码。

  pthread_join 函数,来等待一个线程的结束。

  函数原型为:int pthread_join (pthread_t __th, void **__thread_return)

  第一个参数为被等待的线程标识符,第二个参数为一个用户定义的指针,它可以用来存储被等待线程的返回值。这个函数是一个线程阻塞的函数,调用它的函数将一直等待到被等待的线程结束为止,当函数返回时,被等待线程的资源被收回。线程只能被一个线程等待终止,并且应处于joinable状态(非detached)。

  pthread_exit 函数

  一个线程的结束有两种途径,一种是线程运行的函数结束了,调用它的线程也就结束了;

  另一种方式是通过函数pthread_exit 来实现。它的函数原型为:void pthread_exit (void *__retval)唯一的参数是函数的返回代码,只要pthread_join 中的第二个参数thread_return 不是NULL,这个值将被传递给thread_return。最后要说明的是,一个线程不能被多个线程等待,否则第一个接收到信号的线程成功返回,其余调用pthread_join 的线程则返回错误代码ESRCH。

  2、线程属性

  pthread_create函数的第二个参数线程的属性。将该值设为NULL,也就是采用默认属性,线程的多项属性都是可以更改的。这些属性主要包括绑定属性、分离属性、堆栈地址、堆栈大小、优先级。其中系统默认的属性为非绑定、非分离、缺省1M 的堆栈、与父进程同样级别的优先级。下面首先对绑定属性和分离属性的基本概念进行讲解。

  绑定属性:Linux中采用“一对一”的线程机制,也就是一个用户线程对应一个内核线程。绑定属性就是指一个用户线程固定地分配给一个内核线程,因为CPU时间片的调度是面向内核线程 (也就是轻量级进程)的,因此具有绑定属性的线程可以保证在需要的时候总有一个内核线程与之对应。而与之相对的非绑定属性就是指用户线程和内核线程的关系不是始终固定的,而是由系统来控制分配的。

  分离属性:分离属性是用来决定一个线程以什么样的方式来终止自己。在非分离情况下,当一个线程结束时,它所占用的系统资源并没有被释放,也就是没有真正的终止。只有当pthread_join()函数返回时,创建的线程才能释放自己占用的系统资源。而在分离属性情况下,一个线程结束时立即释放它所占有的系统资源。

  这里要注意的一点是,如果设置一个线程的分离属性,而这个线程运行又非常快,那么它很可能在pthread_create 函数返回之前就终止了,它终止以后就可能将线程号和系统资源移交给其他的线程使用,这时调用pthread_create 的线程就得到了错误的线程号。

  设置绑定属性:

  int pthread_attr_init(pthread_attr_t *attr)
  int pthread_attr_setscope(pthread_attr_t *attr, int scope)
  int pthread_attr_getscope(pthread_attr_t *tattr, int *scope)

  scope:PTHREAD_SCOPE_SYSTEM:绑定,此线程与系统中所有的线程竞争

  PTHREAD_SCOPE_PROCESS:非绑定,此线程与进程中的其他线程竞争

  设置分离属性:

  int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
  int pthread_attr_getdetachstate(const pthread_attr_t *tattr,int *detachstate)

  detachstate PTHREAD_CREATE_DETACHED:分离

  PTHREAD _CREATE_JOINABLE:非分离

  设置调度策略:

  int pthread_attr_setschedpolicy(pthread_attr_t * tattr, int policy)
  int pthread_attr_getschedpolicy(pthread_attr_t * tattr, int *policy)

  policy SCHED_FIFO:先入先出

  SCHED_RR:循环

  SCHED_OTHER:实现定义的方法

  设置优先级:

  int pthread_attr_setschedparam (pthread_attr_t *attr, struct sched_param *param)
  int pthread_attr_getschedparam (pthread_attr_t *attr, struct sched_param *param)

  3、线程访问控制

  1)互斥锁(mutex)

  通过锁机制实现线程间的同步。同一时刻只允许一个线程执行一个关键部分的代码。

int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t *mutexattr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

  (1)先初始化锁init()或静态赋值pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIER

  (2)加锁,lock,trylock,lock阻塞等待锁,trylock立即返回EBUSY

  (3)解锁,unlock需满足是加锁状态,且由加锁线程解锁

  (4)清除锁,destroy(此时锁必需unlock,否则返回EBUSY)

  mutex 分为递归(recursive) 和非递归(non-recursive)两种,这是POSIX 的叫法,另外的名字是可重入(Reentrant) 与非可重入。这两种mutex 作为线程间(inter-thread) 的同步工具时没有区别,它们的惟一区别在于:同一个线程可以重复对recursive mutex 加锁,但是不能重复对non-recursive mutex 加锁。

  首选非递归mutex,绝对不是为了性能,而是为了体现设计意图。non-recursive 和recursive 的性能差别其实不大,因为少用一个计数器,前者略快一点点而已。在同一个线程里多次对non-recursive mutex 加锁会立刻导致死锁,我认为这是它的优点,能帮助我们思考代码对锁的期求,并且及早(在编码阶段)发现问题。毫无疑问recursive mutex 使用起来要方便一些,因为不用考虑一个线程会自己把自己给锁死了,我猜这也是Java 和Windows 默认提供recursive mutex 的原因。(Java 语言自带的intrinsic lock 是可重入的,它的concurrent 库里提供ReentrantLock,Windows的CRITICAL_SECTION 也是可重入的。似乎它们都不提供轻量级的non-recursive mutex。)

  2)条件变量(cond)

  利用线程间共享的全局变量进行同步的一种机制。

int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);   
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t *mutex,const timespec *abstime);
int pthread_cond_destroy(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有线程的阻塞

  (1)初始化. init()或者pthread_cond_t cond=PTHREAD_COND_INITIALIER;属性置为NULL

  (2)等待条件成立. pthread_cond_wait,pthread_cond_timedwait.

  wait()释放锁,并阻塞等待条件变量为真

  timedwait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)

  (3)激活条件变量:pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)

  (4)清除条件变量:destroy; 无线程等待,否则返回EBUSY

  int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
  int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

  一定要在mutex的锁定区域内使用。

  调用 pthread_cond_signal() 释放被条件阻塞的线程时,如果没有任何线程基于条件变量阻塞,则调用pthread_cond_signal()不起作用。而对于 Windows,当调用 SetEvent 触发 Auto-reset 的 Event 条件时,如果没有被条件阻塞的线程,那么此函数仍然起作用,条件变量会处于触发状态。

  使用条件变量实现“生产者消费者问题”:

  #include<stdio.h>
#include<stdlib.h>
#include<time.h>
#include"pthread.h"

  #define BUFFER_SIZE 16

  struct prodcons
{
    int buffer[BUFFER_SIZE];
    pthread_mutex_t lock; //mutex ensuring exclusive access to buffer
    int readpos,writepos; //position for reading and writing
    pthread_cond_t notempty; //signal when buffer is not empty
    pthread_cond_t notfull; //signal when buffer is not full
};

  //initialize a buffer
void init(struct prodcons* b)
{
   pthread_mutex_init(&b->lock,NULL);
   pthread_cond_init(&b->notempty,NULL);
   pthread_cond_init(&b->notfull,NULL);
   b->readpos=0;
   b->writepos=0;
}

  //store an integer in the buffer
void put(struct prodcons* b, int data)
{
   pthread_mutex_lock(&b->lock);
   //wait until buffer is not full
   while((b->writepos+1)%BUFFER_SIZE==b->readpos)
   {
    printf("wait for not full\n");
    pthread_cond_wait(&b->notfull,&b->lock);
   }   
   
   b->buffer[b->writepos]=data;
   b->writepos++;
   pthread_cond_signal(&b->notempty); //signal buffer is not empty
   pthread_mutex_unlock(&b->lock);
}

  //read and remove an integer from the buffer
int get(struct prodcons* b)
{
   int data;
   pthread_mutex_lock(&b->lock);
   //wait until buffer is not empty
   while(b->writepos==b->readpos)
   {
    printf("wait for not empty\n");
    pthread_cond_wait(&b->notempty,&b->lock);
   }   
   
   data=b->buffer[b->readpos];
   b->readpos++;
   if(b->readpos>=BUFFER_SIZE) b->readpos=0;
   pthread_cond_signal(&b->notfull); //signal buffer is not full
   pthread_mutex_unlock(&b->lock);
   return data;
}

  #define OVER -1

  struct prodcons buffer;

  void * producer(void * data)
{
   int n;
   for(n=0;n<1000;++n)
   {
    printf("put-->%d\n",n);
    put(&buffer,n);
   }
   put(&buffer,OVER);
   printf("producer stopped\n");
   return NULL;
}

  void * consumer(void * data)
{
   int n;
   while(1)
   {
    int d=get(&buffer);
    if(d==OVER) break;
    printf("%d-->get\n",d);
   }
   printf("consumer stopped\n");
   return NULL;
}

  int main()
{
  pthread_t tha,thb;
  void * retval;
  
  init(&buffer);
  pthread_creare(&tha,NULL,producer,0);
  pthread_creare(&thb,NULL,consumer,0);
  
  pthread_join(tha,&retval);
  pthread_join(thb,&retval);
  
  return 0;
}

  PS:如果遇到如下问题:加个编译条件'-pthread'

  prodcons.c:(.text+0x2ab): undefined reference to `pthread_create'
  prodcons.c:(.text+0x2bd): undefined reference to `pthread_join'

  3)信号量

  如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。

  信号量函数的名字都以"sem_"打头。线程使用的基本信号量函数有四个。

  #include <semaphore.h>

  int sem_init(sem_t *sem , int pshared, unsigned int value);

  这是对由sem指定的信号量进行初始化,设置好它的共享选项(linux只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。

  两个原子操作函数:

  int sem_wait(sem_t *sem);

  int sem_post(sem_t *sem);

  这两个函数都要用一个由sem_init调用初始化的信号量对象的指针做参数。

  sem_post:给信号量的值加1;

  sem_wait:给信号量减1;对一个值为0的信号量调用sem_wait,这个函数将会等待直到有其它线程使它不再是0为止。

  int sem_destroy(sem_t *sem);

  这个函数的作用是再我们用完信号量后都它进行清理。归还自己占有的一切资源。

  用信号量的方法实现生产者消费者

  这里使用4个信号量,其中两个信号量occupied和empty分别用于解决生产者和消费者线程之间的同步问题,pmut和cmut是用于这两个线程之间的互斥问题。其中empty初始化为N(有界缓区的空间元数),occupied初始化为0,pmut和cmut初始化为1。

  typedef struct 
{
 char buf[BSIZE];
 sem_t occupied;
 sem_t empty;
 int nextin;
 int nextout;
 sem_t pmut;
 sem_t cmut;
}buffer_t;

  buffer_t buffer;

  void init(buffer_t buffer)
{
 sem_init(&buffer.occupied, 0, 0);
 sem_init(&buffer.empty,0, BSIZE);
 sem_init(&buffer.pmut, 0, 1);
 sem_init(&buffer.cmut, 0, 1);
 buffer.nextin = buffer.nextout = 0;
}

  void producer(buffer_t *b, char item) 
{
 sem_wait(&b->empty);
 sem_wait(&b->pmut);
 b->buf[b->nextin] = item;
 b->nextin++;
 b->nextin %= BSIZE;
 sem_post(&b->pmut);
 sem_post(&b->occupied);
}

  char consumer(buffer_t *b) 
{
 char item;
 sem_wait(&b->occupied);
 sem_wait(&b->cmut);
 item = b->buf[b->nextout];
 b->nextout++;
 b->nextout %= BSIZE;
 sem_post(&b->cmut);
 sem_post(&b->empty);
 return(item);
}