魔法系统之家下载 - 系统之家,XP系统之家,GhostXP系统,纯净版XP系统,Windows7下载之家

当前位置:首页 > 系统教程 > 服务器 > 详细页面

nginx中worker进程循环的实现示例

编辑:佚名 2021-05-28 15:32:00 来源于:魔法系统之家下载

这篇文章给大家分享的是有关nginx中worker进程循环的实现示例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

worker进程启动后,其首先会初始化自身运行所需要的环境,然后会进入一个循环,在该循环中不断检查是否有需要执行的事件,然后处理事件。在这个过程中,worker进程也是需要与master进程交互的,更有甚者,worker进程作为一个子进程,也是可以接收命令行指令(比如kill等)以进行相应逻辑的处理的。那么worker进程是如何与master或者命令行指令进行交互的呢?本文首先会对worker进程与master进程交互方式,以及worker进程如何处理命令行指令的流程进行讲解,然后会从源码上对worker进程交互的整个工作流程进行介绍。

1. worker与master进程交互方式

这里首先需要说明的是,无论是master还是外部命令的方式,nginx都是通过标志位的方式来处理相应的指令的,也即在接收到一个指令(无论是master还是外部命令)的时候,worker会在其回调方法中设置与该指令相对应的标志位,然后在worker进程在其自身的循环中处理完事件之后会依次检查这些标志位是否为真,是则根据该标志位的作用执行相应的逻辑。

对于worker进程与master进程的交互,其是通过socket管道的方式进行的。在ngx_process.h文件中声明了一个ngx_process_t结构体,这里我们主要关注其channel属性:

typedef struct {  // 其余属性...  
  ngx_socket_t channel[2];
} ngx_process_t;

        这里的ngx_process_t结构体的作用是存储某个进程相关的信息的,比如pid、channel、status等。每个进程中都有一个ngx_processes数组,数组元素就是这里的ngx_process_t结构体,也就是说每个进程都会通过ngx_processes数组保存其余进程的基本信息。其声明如下:

// 存储了nginx中所有的子进程数组,每个子进程都有一个对应的ngx_process_t结构体进行标记
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
        这里我们就可以看出,每个进程都会一个与之对应的channel数组,这个数组的长度为2,其是与master进程进行交互的管道流。在master进程创建每一个子进程的之前,都会创建一个channel数组,该数组的创建方法为:

int socketpair(int domain, int type, int protocol, int sv[2]);
        这个方法的主要作用是创建一对匿名的已经连接的套接字,也就是说,如果在一个套接字中写入数据,那么在另一个套接字中就可以接收到写入的数据。通过这种方式,如果在父进程中往管道的一边写入数据,那么在子进程就可以在另一边接收到数据,这样就可以实现父子进程的数据通信了。

        在master进程启动完子进程之后,子进程会保有master进程中相应的数据,也包括这里的channel数组。如此,master进程就可以通过channel数组实现与子进程的通信了。

2. worker处理外部命令

        对于外部命令,其本质上是通过signals数组中定义的各个信号以及回调方法进行处理的。在master进程初始化基本环境的时候,会将signals数组中指定的信号回调方法设置到对应的信号中。由于worker进程会继承master进程的基本环境,因而worker进程在接收到这里设置的信号之后,也会调用对应的回调方法。而该回调方法的主要逻辑也仅仅只是设置相应的标志位的值。关于nginx接收到信号之后如何设置对应的标志位,可以参照本人前面的文章(nginx master工作循环 超链接),这里不再赘述。

3. 源码讲解

        master进程是通过ngx_start_worker_processes()方法启动各个子进程的,如下是该方法源码:

/**
 * 启动n个worker子进程,并设置好每个子进程与master父进程之间使用socketpair
 * 系统调用建立起来的socket句柄通信机制
 */static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
 ngx_int_t i;
 ngx_channel_t ch;
 
 ngx_memzero(&ch, sizeof(ngx_channel_t));
 ch.command = NGX_CMD_OPEN_CHANNEL;

 for (i = 0; i < n; i++) {

  // spawn是产卵的意思,这里就是生成一个子进程的意思,而该子进程所进行的事件循环就是  // ngx_worker_process_cycle()方法,这里的ngx_worker_process_cycle是worker进程处理事件的循环,  // worker进程在一个无限for循环中,不断的检查相应的事件模型中是否存在对应的事件,  // 然后将accept事件和read、write事件分开放入两个队列中,最后在事件循环中不断的处理事件  ngx_spawn_process(cycle, ngx_worker_process_cycle, 
           (void *) (intptr_t) i, "worker process", type);

  // 下面的这段代码的主要作用是将新建进程这个事件通知到其他的进程,上面的  // ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是当前是新建了一个进程,  // 而ngx_process_slot存储的就是该新建进程所存放的数组位置,这里需要进行广播的原因在于,  // 每个子进程被创建后,其内存数据都是复制的父进程的,但是ngx_processes数组是每个进程都有一份的,  // 因而数组中先创建的子进程是没有后创建的子进程的数据的,但是master进程是有所有子进程的数据的,  // 因而这里master进程创建子进程之后,其就会向ngx_processes数组的每个进程的channel[0]上  // 写入当前广播的事件,也即这里的ch,通过这种方式,每个子进程接收到这个事件之后,  // 都会尝试更新其所保存的ngx_processes数据信息  ch.pid = ngx_processes[ngx_process_slot].pid;
  ch.slot = ngx_process_slot;
  ch.fd = ngx_processes[ngx_process_slot].channel[0];

  // 广播事件  ngx_pass_open_channel(cycle, &ch);
 }
}

        这里我们主要需要关注上面的启动子进程的方法调用,也即这里的ngx_spawn_process()方法,该方法的第二个参数是一个方法,在启动子进程之后,子进程就会进入该方法所指定的循环中。而在ngx_spawn_process()方法中,master进程会为当前新创建的子进程创建一个channel数组,以用于与当前子进程进行通信。如下是ngx_spawn_process()方法的源码:

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
 u_long on;
 ngx_pid_t pid;
 ngx_int_t s;

 if (respawn >= 0) {
  s = respawn;

 } else {
  // 在ngx_processes数组中存储了当前创建的所有进程,而ngx_last_process则是当前当前记录的最后一个  // process在ngx_processes中的下一个位置的索引,只不过ngx_processes中记录的进程有可能有部分  // 已经失效了。当前循环就是从头开始查找是否有某个进程已经失效了,如果已经失效了,则复用该进程位置,  // 否则直接使用ngx_last_process所指向的位置  for (s = 0; s < ngx_last_process; s++) {
   if (ngx_processes[s].pid == -1) {
    break;
   }
  }

  // 这里说明所创建的进程数达到了最大限度  if (s == NGX_MAX_PROCESSES) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
          "no more than %d processes can be spawned",
          NGX_MAX_PROCESSES);
   return NGX_INVALID_PID;
  }
 }

 // NGX_PROCESS_DETACHED标志表示当前fork出来的进程与原来的父进程没有任何关系,比如进行nginx升级时, // 新生成的master进程就与原先的master进程没有关系 if (respawn != NGX_PROCESS_DETACHED) {

  /* Solaris 9 still has no AF_LOCAL */  // 这里的socketpair()方法的主要作用是生成一对套接字流,用于主进程和子进程的通信,这一对套接字会  // 存储在ngx_processes[s].channel中,本质上这个字段是一个长度为2的整型数组。在主进程和子进程  // 进行通信的之前,主进程会关闭其中一个,而子进程会关闭另一个,然后相互之间往未关闭的另一个文件描述符中  // 写入或读取数据即可实现通信。  // AF_UNIX表示当前使用的是UNIX文件形式的socket地址族  // SOCK_STREAM指定了当前套接字建立的通信方式是管道流,并且这个管道流是双向的,  // 即管道双方都可以进行读写操作  // 第三个参数protocol必须为0  if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "socketpair() failed while spawning \"%s\"", name);
   return NGX_INVALID_PID;
  }

  ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
          "channel %d:%d",
          ngx_processes[s].channel[0],
          ngx_processes[s].channel[1]);

  // 将ngx_processes[s].channel[0]设置为非阻塞模式  if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 将ngx_processes[s].channel[1]设置为非阻塞模式  if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  on = 1;
  // 将ngx_processes[s].channel[0]套接字管道设置为异步模式  if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 当前还处于主进程中,这里的ngx_pid指向了主进程的进程id,当前方法的作用主要是将  // ngx_processes[s].channel[0]的操作权限设置给主进程,也就是说主进程通过向  // ngx_processes[s].channel[0]写入和读取数据来与子进程进行通信  if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用  if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用  if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // ngx_processes[s].channel[1]是用于给子进程监听相关事件使用的,当父进程向  // ngx_processes[s].channel[0]发布事件之后,ngx_processes[s].channel[1]中就会接收到  // 对应的事件,从而进行相应的处理  ngx_channel = ngx_processes[s].channel[1];

 } else {
  // 如果是NGX_PROCESS_DETACHED模式,则表示当前是另外新起的一个master进程,因而将其管道值都置为-1  ngx_processes[s].channel[0] = -1;
  ngx_processes[s].channel[1] = -1;
 }

 ngx_process_slot = s;


 // fork()方法将产生一个新的进程,这个进程与父进程的关系是子进程的内存数据将完全复制父进程的。 // 还需要注意的是,fork()出来的子进程执行的代码是从fork()之后开始执行的,而对于父进程而言, // 该方法的返回值为父进程id,而对于子进程而言,该方法返回值为0,因而通过if-else语句就可以让父进程 // 和子进程分别调用后续不同的代码片段 pid = fork();

 switch (pid) {

  case -1:
   // fork出错   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fork() failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;

  case 0:
   // 子进程执行的分支,这里的proc()方法是外部传进来的,也就是说,当前方法只是创建一个新的进程,   // 具体的进程处理逻辑,将交由外部代码块进行定义ngx_getpid()方法获取的就是当前新创建的子进程的进程id   ngx_pid = ngx_getpid();
   proc(cycle, data);
   break;

  default:
   // 父进程会走到这里   break;
 }

 ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

 // 父进程会走到这里,当前的pid是fork()之后父进程得到的新创建的子进程的pid ngx_processes[s].pid = pid;
 ngx_processes[s].exited = 0;

 if (respawn >= 0) {
  return pid;
 }

 // 设置当前进程的各个属性,并且存储到ngx_processes数组中的对应位置 ngx_processes[s].proc = proc;
 ngx_processes[s].data = data;
 ngx_processes[s].name = name;
 ngx_processes[s].exiting = 0;

 switch (respawn) {

  case NGX_PROCESS_NORESPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_SPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_DETACHED:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 1;
   break;
 }

 if (s == ngx_last_process) {
  ngx_last_process++;
 }

 return pid;
}

        ngx_spawn_process()方法最后会fork()一个子进程以执行其第二个参数所指定的回调方法。但是在这之前,我们需要说明的是,其通过socketpair()方法调用会创建一对匿名的socket,然后将其存储在当前进程的channel数组中,如此就完成了channel数组的创建。

        worker进程启动之后会执行ngx_worker_process_cycle()方法,该方法首先会对worker进程进行初始化,其中就包括对继承而来的channel数组的处理。由于master进程和worker进程都保有channel数组所指代的socket描述符,而本质上master进程和各个worker进程只需要保有该数组的某一边的描述符即可。因而这里worker进程在初始化过程中,会关闭其所保存的另一边的描述符。在nginx中,master进程统一的会保留channel数组的0号位的socket描述符,关闭1号位的socket描述符,而worker进程则会关闭0号位的socket描述符,保留1号位的描述符。这样master进程需要与worker进程通信时,就只需要往channel[0]中写入数据,而worker进程则会监听channel[1],从而接收到master进程的数据写入。这里我们首先看一下worker进程的初始化方法ngx_worker_process_init()的源码:


分享到:

相关信息

热门资讯

频道热点资讯