RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
nginx中worker进程循环的实现示例

这篇文章给大家分享的是有关nginx中worker进程循环的实现示例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

成都创新互联公司成立于2013年,先为万州等服务建站,万州等地企业,进行企业商务咨询服务。为万州企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

worker进程启动后,其首先会初始化自身运行所需要的环境,然后会进入一个循环,在该循环中不断检查是否有需要执行的事件,然后处理事件。在这个过程中,worker进程也是需要与master进程交互的,更有甚者,worker进程作为一个子进程,也是可以接收命令行指令(比如kill等)以进行相应逻辑的处理的。那么worker进程是如何与master或者命令行指令进行交互的呢?本文首先会对worker进程与master进程交互方式,以及worker进程如何处理命令行指令的流程进行讲解,然后会从源码上对worker进程交互的整个工作流程进行介绍。

1. worker与master进程交互方式

这里首先需要说明的是,无论是master还是外部命令的方式,nginx都是通过标志位的方式来处理相应的指令的,也即在接收到一个指令(无论是master还是外部命令)的时候,worker会在其回调方法中设置与该指令相对应的标志位,然后在worker进程在其自身的循环中处理完事件之后会依次检查这些标志位是否为真,是则根据该标志位的作用执行相应的逻辑。

对于worker进程与master进程的交互,其是通过socket管道的方式进行的。在ngx_process.h文件中声明了一个ngx_process_t结构体,这里我们主要关注其channel属性:

typedef struct {
  // 其余属性...
  
  ngx_socket_t channel[2];
} ngx_process_t;

        这里的ngx_process_t结构体的作用是存储某个进程相关的信息的,比如pid、channel、status等。每个进程中都有一个ngx_processes数组,数组元素就是这里的ngx_process_t结构体,也就是说每个进程都会通过ngx_processes数组保存其余进程的基本信息。其声明如下:

// 存储了nginx中所有的子进程数组,每个子进程都有一个对应的ngx_process_t结构体进行标记
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
        这里我们就可以看出,每个进程都会一个与之对应的channel数组,这个数组的长度为2,其是与master进程进行交互的管道流。在master进程创建每一个子进程的之前,都会创建一个channel数组,该数组的创建方法为:

int socketpair(int domain, int type, int protocol, int sv[2]);
        这个方法的主要作用是创建一对匿名的已经连接的套接字,也就是说,如果在一个套接字中写入数据,那么在另一个套接字中就可以接收到写入的数据。通过这种方式,如果在父进程中往管道的一边写入数据,那么在子进程就可以在另一边接收到数据,这样就可以实现父子进程的数据通信了。

        在master进程启动完子进程之后,子进程会保有master进程中相应的数据,也包括这里的channel数组。如此,master进程就可以通过channel数组实现与子进程的通信了。

2. worker处理外部命令

        对于外部命令,其本质上是通过signals数组中定义的各个信号以及回调方法进行处理的。在master进程初始化基本环境的时候,会将signals数组中指定的信号回调方法设置到对应的信号中。由于worker进程会继承master进程的基本环境,因而worker进程在接收到这里设置的信号之后,也会调用对应的回调方法。而该回调方法的主要逻辑也仅仅只是设置相应的标志位的值。关于nginx接收到信号之后如何设置对应的标志位,可以参照本人前面的文章(nginx master工作循环 超链接),这里不再赘述。

3. 源码讲解

        master进程是通过ngx_start_worker_processes()方法启动各个子进程的,如下是该方法源码:

/**
 * 启动n个worker子进程,并设置好每个子进程与master父进程之间使用socketpair
 * 系统调用建立起来的socket句柄通信机制
 */
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
 ngx_int_t i;
 ngx_channel_t ch;
 
 ngx_memzero(&ch, sizeof(ngx_channel_t));
 ch.command = NGX_CMD_OPEN_CHANNEL;

 for (i = 0; i < n; i++) {

  // spawn是产卵的意思,这里就是生成一个子进程的意思,而该子进程所进行的事件循环就是
  // ngx_worker_process_cycle()方法,这里的ngx_worker_process_cycle是worker进程处理事件的循环,
  // worker进程在一个无限for循环中,不断的检查相应的事件模型中是否存在对应的事件,
  // 然后将accept事件和read、write事件分开放入两个队列中,最后在事件循环中不断的处理事件
  ngx_spawn_process(cycle, ngx_worker_process_cycle, 
           (void *) (intptr_t) i, "worker process", type);

  // 下面的这段代码的主要作用是将新建进程这个事件通知到其他的进程,上面的
  // ch.command = NGX_CMD_OPEN_CHANNEL;中NGX_CMD_OPEN_CHANNEL表示的就是当前是新建了一个进程,
  // 而ngx_process_slot存储的就是该新建进程所存放的数组位置,这里需要进行广播的原因在于,
  // 每个子进程被创建后,其内存数据都是复制的父进程的,但是ngx_processes数组是每个进程都有一份的,
  // 因而数组中先创建的子进程是没有后创建的子进程的数据的,但是master进程是有所有子进程的数据的,
  // 因而这里master进程创建子进程之后,其就会向ngx_processes数组的每个进程的channel[0]上
  // 写入当前广播的事件,也即这里的ch,通过这种方式,每个子进程接收到这个事件之后,
  // 都会尝试更新其所保存的ngx_processes数据信息
  ch.pid = ngx_processes[ngx_process_slot].pid;
  ch.slot = ngx_process_slot;
  ch.fd = ngx_processes[ngx_process_slot].channel[0];

  // 广播事件
  ngx_pass_open_channel(cycle, &ch);
 }
}

        这里我们主要需要关注上面的启动子进程的方法调用,也即这里的ngx_spawn_process()方法,该方法的第二个参数是一个方法,在启动子进程之后,子进程就会进入该方法所指定的循环中。而在ngx_spawn_process()方法中,master进程会为当前新创建的子进程创建一个channel数组,以用于与当前子进程进行通信。如下是ngx_spawn_process()方法的源码:

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
 u_long on;
 ngx_pid_t pid;
 ngx_int_t s;

 if (respawn >= 0) {
  s = respawn;

 } else {
  // 在ngx_processes数组中存储了当前创建的所有进程,而ngx_last_process则是当前当前记录的最后一个
  // process在ngx_processes中的下一个位置的索引,只不过ngx_processes中记录的进程有可能有部分
  // 已经失效了。当前循环就是从头开始查找是否有某个进程已经失效了,如果已经失效了,则复用该进程位置,
  // 否则直接使用ngx_last_process所指向的位置
  for (s = 0; s < ngx_last_process; s++) {
   if (ngx_processes[s].pid == -1) {
    break;
   }
  }

  // 这里说明所创建的进程数达到了最大限度
  if (s == NGX_MAX_PROCESSES) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
          "no more than %d processes can be spawned",
          NGX_MAX_PROCESSES);
   return NGX_INVALID_PID;
  }
 }

 // NGX_PROCESS_DETACHED标志表示当前fork出来的进程与原来的父进程没有任何关系,比如进行nginx升级时,
 // 新生成的master进程就与原先的master进程没有关系
 if (respawn != NGX_PROCESS_DETACHED) {

  /* Solaris 9 still has no AF_LOCAL */

  // 这里的socketpair()方法的主要作用是生成一对套接字流,用于主进程和子进程的通信,这一对套接字会
  // 存储在ngx_processes[s].channel中,本质上这个字段是一个长度为2的整型数组。在主进程和子进程
  // 进行通信的之前,主进程会关闭其中一个,而子进程会关闭另一个,然后相互之间往未关闭的另一个文件描述符中
  // 写入或读取数据即可实现通信。
  // AF_UNIX表示当前使用的是UNIX文件形式的socket地址族
  // SOCK_STREAM指定了当前套接字建立的通信方式是管道流,并且这个管道流是双向的,
  // 即管道双方都可以进行读写操作
  // 第三个参数protocol必须为0
  if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "socketpair() failed while spawning \"%s\"", name);
   return NGX_INVALID_PID;
  }

  ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
          "channel %d:%d",
          ngx_processes[s].channel[0],
          ngx_processes[s].channel[1]);

  // 将ngx_processes[s].channel[0]设置为非阻塞模式
  if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 将ngx_processes[s].channel[1]设置为非阻塞模式
  if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  on = 1;
  // 将ngx_processes[s].channel[0]套接字管道设置为异步模式
  if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // 当前还处于主进程中,这里的ngx_pid指向了主进程的进程id,当前方法的作用主要是将
  // ngx_processes[s].channel[0]的操作权限设置给主进程,也就是说主进程通过向
  // ngx_processes[s].channel[0]写入和读取数据来与子进程进行通信
  if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用
  if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用
  if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // ngx_processes[s].channel[1]是用于给子进程监听相关事件使用的,当父进程向
  // ngx_processes[s].channel[0]发布事件之后,ngx_processes[s].channel[1]中就会接收到
  // 对应的事件,从而进行相应的处理
  ngx_channel = ngx_processes[s].channel[1];

 } else {
  // 如果是NGX_PROCESS_DETACHED模式,则表示当前是另外新起的一个master进程,因而将其管道值都置为-1
  ngx_processes[s].channel[0] = -1;
  ngx_processes[s].channel[1] = -1;
 }

 ngx_process_slot = s;


 // fork()方法将产生一个新的进程,这个进程与父进程的关系是子进程的内存数据将完全复制父进程的。
 // 还需要注意的是,fork()出来的子进程执行的代码是从fork()之后开始执行的,而对于父进程而言,
 // 该方法的返回值为父进程id,而对于子进程而言,该方法返回值为0,因而通过if-else语句就可以让父进程
 // 和子进程分别调用后续不同的代码片段
 pid = fork();

 switch (pid) {

  case -1:
   // fork出错
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fork() failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;

  case 0:
   // 子进程执行的分支,这里的proc()方法是外部传进来的,也就是说,当前方法只是创建一个新的进程,
   // 具体的进程处理逻辑,将交由外部代码块进行定义ngx_getpid()方法获取的就是当前新创建的子进程的进程id
   ngx_pid = ngx_getpid();
   proc(cycle, data);
   break;

  default:
   // 父进程会走到这里
   break;
 }

 ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

 // 父进程会走到这里,当前的pid是fork()之后父进程得到的新创建的子进程的pid
 ngx_processes[s].pid = pid;
 ngx_processes[s].exited = 0;

 if (respawn >= 0) {
  return pid;
 }

 // 设置当前进程的各个属性,并且存储到ngx_processes数组中的对应位置
 ngx_processes[s].proc = proc;
 ngx_processes[s].data = data;
 ngx_processes[s].name = name;
 ngx_processes[s].exiting = 0;

 switch (respawn) {

  case NGX_PROCESS_NORESPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_SPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_DETACHED:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 1;
   break;
 }

 if (s == ngx_last_process) {
  ngx_last_process++;
 }

 return pid;
}

        ngx_spawn_process()方法最后会fork()一个子进程以执行其第二个参数所指定的回调方法。但是在这之前,我们需要说明的是,其通过socketpair()方法调用会创建一对匿名的socket,然后将其存储在当前进程的channel数组中,如此就完成了channel数组的创建。

        worker进程启动之后会执行ngx_worker_process_cycle()方法,该方法首先会对worker进程进行初始化,其中就包括对继承而来的channel数组的处理。由于master进程和worker进程都保有channel数组所指代的socket描述符,而本质上master进程和各个worker进程只需要保有该数组的某一边的描述符即可。因而这里worker进程在初始化过程中,会关闭其所保存的另一边的描述符。在nginx中,master进程统一的会保留channel数组的0号位的socket描述符,关闭1号位的socket描述符,而worker进程则会关闭0号位的socket描述符,保留1号位的描述符。这样master进程需要与worker进程通信时,就只需要往channel[0]中写入数据,而worker进程则会监听channel[1],从而接收到master进程的数据写入。这里我们首先看一下worker进程的初始化方法ngx_worker_process_init()的源码:

/**
 * 这里主要是对当前进程进行初始化,为其设置优先级和打开的文件限制等参数。
 * 最后会为当前进程添加一个监听channel[1]的连接,以不断读取master进程的消息,从而进行相应的处理
 */
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
 sigset_t set;
 ngx_int_t n;
 ngx_time_t *tp;
 ngx_uint_t i;
 ngx_cpuset_t *cpu_affinity;
 struct rlimit rlmt;
 ngx_core_conf_t *ccf;
 ngx_listening_t *ls;

 // 设置时区相关的信息
 if (ngx_set_environment(cycle, NULL) == NULL) {
  /* fatal */
  exit(2);
 }

 ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

 // 设置当前进程的优先级
 if (worker >= 0 && ccf->priority != 0) {
  if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setpriority(%d) failed", ccf->priority);
  }
 }

 // 设置当前进程能够打开的文件句柄数
 if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;

  if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_NOFILE, %i) failed",
          ccf->rlimit_nofile);
  }
 }

 // Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes.
 // 简而言之就是设置核心文件能够使用的最大大小
 if (ccf->rlimit_core != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_core;

  if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_CORE, %O) failed",
          ccf->rlimit_core);
  }
 }

 // geteuid()返回执行当前程序的用户id,这里的0表示是否为root用户
 if (geteuid() == 0) {
  // setgid()方法的作用是更改组的id
  if (setgid(ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setgid(%d) failed", ccf->group);
   /* fatal */
   exit(2);
  }

  // initgroups()是更改附加组的id
  if (initgroups(ccf->username, ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "initgroups(%s, %d) failed",
          ccf->username, ccf->group);
  }

  // 更改用户的id
  if (setuid(ccf->user) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setuid(%d) failed", ccf->user);
   /* fatal */
   exit(2);
  }
 }

 // 需要注意的是,对于cache manager和cache loader进程,这里的worker传入的是-1,
 // 表示这两个进程不需要设置亲核性
 if (worker >= 0) {
  // 获取当前worker的CPU亲核性
  cpu_affinity = ngx_get_cpu_affinity(worker);

  if (cpu_affinity) {
   // 设置worker的亲核心
   ngx_setaffinity(cpu_affinity, cycle->log);
  }
 }

#if (NGX_HAVE_PR_SET_DUMPABLE)
 if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "prctl(PR_SET_DUMPABLE) failed");
 }

#endif

 if (ccf->working_directory.len) {
  // chdir()的作用是将当前的工作目录更改为其参数所传入的路径
  if (chdir((char *) ccf->working_directory.data) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "chdir(\"%s\") failed", ccf->working_directory.data);
   /* fatal */
   exit(2);
  }
 }

 // 初始化空的set指令集合
 sigemptyset(&set);

 // ◆ SIG_BLOCK:将 set 参数指向信号集中的信号加入到信号掩码中。
 // ◆ SIG_UNBLOCK:将 set 参数指向的信号集中的信号从信号掩码中删除。
 // ◆ SIG_SETMASK:将 set 参数指向信号集设置为信号掩码。
 // 这里就是直接初始化要阻塞的信号集,默认为空集
 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "sigprocmask() failed");
 }

 tp = ngx_timeofday();
 srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec);

 ls = cycle->listening.elts;
 for (i = 0; i < cycle->listening.nelts; i++) {
  ls[i].previous = NULL;
 }

 // 这里调用各个模块的init_process()方法进行进程模块的初始化
 for (i = 0; cycle->modules[i]; i++) {
  if (cycle->modules[i]->init_process) {
   if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
    /* fatal */
    exit(2);
   }
  }
 }

 // 这里主要是关闭当前进程中其他各个进程的channel[1]管道句柄
 for (n = 0; n < ngx_last_process; n++) {

  if (ngx_processes[n].pid == -1) {
   continue;
  }

  if (n == ngx_process_slot) {
   continue;
  }

  if (ngx_processes[n].channel[1] == -1) {
   continue;
  }

  if (close(ngx_processes[n].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "close() channel failed");
  }
 }

 // 关闭当前进程的channel[0]管道句柄
 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "close() channel failed");
 }

#if 0
 ngx_last_process = 0;
#endif

 // ngx_channel指向的是当前进程的channel[1]句柄,也即监听master进程发送消息的句柄。
 // 当前方法中,首先会为当前的句柄创建一个connection对象,并且将其封装为一个事件,然后将该事件添加到
 // 对应的事件模型队列中以监听当前句柄的事件,事件的处理逻辑则主要有这里的ngx_channel_handler()
 // 方法进行。这里的ngx_channel_handler的主要处理逻辑是,根据当前收到的消息设置当前进程的一些标志位,
 // 或者更新某些缓存数据,如此,在当前进行的事件循环中,通过不断检查这些标志位,从而实现在事件进程中
 // 处理真正的逻辑。因而这里的ngx_channel_handler的处理效率是非常高的
 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
              ngx_channel_handler)
   == NGX_ERROR) {
  /* fatal */
  exit(2);
 }
}

        该方法主要是对worker进程进行初始化,这里我们主要需要关注最后会遍历ngx_processes数组,这个数组中保存了当前nginx中各个进程的相关信息。在遍历过程中,会关闭当前进程保有的其余进程的channel[1]句柄,而保留有channel[0]句柄,这样当前进程如果需要与其他进程通信,也只需要往目标进程的channel[0]中写入数据即可。在遍历完成之后,当前进程就会关闭自身的channel[0]句柄,而保留channel[1]句柄。最后,会通过ngx_add_channel_event()方法为当前进程添加对channel[1]的监听事件,这里在调用ngx_add_channel_event()方法时传入的第二个参数是ngx_channel,该参数是在前面的ngx_spawn_process()方法中赋值的,指向的就是当前进程的channel[1]的socket句柄。

        关于ngx_add_channel_event()方法,其本质就是创建一个ngx_event_t结构体的事件,然后将其添加到当前所使用的事件模型(比如epoll)句柄中。这里不再赘述该方法的实现源码,不过我们需要关注的是该事件触发时的回调方法,即调用ngx_add_channel_event()方法时传入的第三个参数ngx_channel_handler()方法。如下是该方法的源码:

static void ngx_channel_handler(ngx_event_t *ev) {
 ngx_int_t n;
 ngx_channel_t ch;
 ngx_connection_t *c;

 if (ev->timedout) {
  ev->timedout = 0;
  return;
 }

 c = ev->data;

 for (;;) {

  // 在无限for循环中不断读取master进程发过来的消息
  n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

  // 如果读取消息出错,说明当前的句柄可能失效了,就需要关闭当前连接
  if (n == NGX_ERROR) {
   if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
    ngx_del_conn(c, 0);
   }

   ngx_close_connection(c);
   return;
  }

  if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
   if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
    return;
   }
  }

  if (n == NGX_AGAIN) {
   return;
  }

  // 对发送过来的消息进行处理
  switch (ch.command) {
   // 如果是quit消息,则设置quit标志位
   case NGX_CMD_QUIT:
    ngx_quit = 1;
    break;

    // 如果terminate消息,则设置terminate标志位
   case NGX_CMD_TERMINATE:
    ngx_terminate = 1;
    break;

    // 如果是reopen消息,则设置reopen标志位
   case NGX_CMD_REOPEN:
    ngx_reopen = 1;
    break;

    // 如果是新建进程消息,则更新当前ngx_processes数组对应位置的数据
   case NGX_CMD_OPEN_CHANNEL:
    ngx_processes[ch.slot].pid = ch.pid;
    ngx_processes[ch.slot].channel[0] = ch.fd;
    break;

    // 如果是关闭channel的消息,则关闭ngx_processes数组对应位置的句柄
   case NGX_CMD_CLOSE_CHANNEL:
    if (close(ngx_processes[ch.slot].channel[0]) == -1) {
     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
            "close() channel failed");
    }

    ngx_processes[ch.slot].channel[0] = -1;
    break;
  }
 }
}

        在ngx_channel_handler()方法中,主要是读取所监听的socket句柄中的数据,而数据是以一个ngx_channel_t结构体所承载的,这个ngx_channel_t是nginx所统一使用的master与worker进程进行通信的结构体,其会指定当前发生的事件类型,以及发生该事件的进程信息。如下是ngx_channel_t结构体的声明:

typedef struct {
  // 当前发生的事件类型
  ngx_uint_t command;
  // 发生事件的pid
  ngx_pid_t pid;
  // 发生事件的进程在ngx_processes数组中的下标
  ngx_int_t slot;
  // 发生事件的进程的channel[0]描述符的值
  ngx_fd_t fd;
} ngx_channel_t;

       在从当前进程的channel[1]中读取了ngx_channel_t结构体的数据之后,ngx_channel_handler()方法会根据发生的事件类型更新相应的标志位的状态,并且会更新当前进程的ngx_processes数组中对应的发生事件的进程的状态信息。

        在处理了master进程所发送的事件之后,worker进程就会继续其循环,在该循环中会检查其所关注的标志位的状态,然后会根据这些状态执行对应的逻辑。如下是worker进程工作的循环的源码:

/**
 * 进入worker进程工作的循环
 */
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
 ngx_int_t worker = (intptr_t) data;

 ngx_process = NGX_PROCESS_WORKER;
 ngx_worker = worker;

 // 初始化worker进程,前面对该方法的源码进行了讲解
 ngx_worker_process_init(cycle, worker);

 ngx_setproctitle("worker process");

 for (;;) {

  if (ngx_exiting) {
   // 这里主要是检查有没有事件是非cancelable状态的,也就是说是否所有的事件都已经取消了,如果取消了,
   // 就会返回NGX_OK。这里的逻辑可以理解为,如果被标记为了ngx_exiting,那么此时,如果还有未取消的
   // 事件存在,则会走到下面的ngx_process_events_and_timers()方法,如此就会处理未完成的事件,
   // 然后在循环中再次走到这个位置,最终if条件为true,从而执行退出worker进程的工作
   if (ngx_event_no_timers_left() == NGX_OK) {
    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
    ngx_worker_process_exit(cycle);
   }
  }

  ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

  // 这里通过检查相应的事件模型中是否存在对应的事件,然后将其放入队列中进行处理,
  // 这里是worker进程处理事件的核心方法
  ngx_process_events_and_timers(cycle);

  // 这里ngx_terminate是强制关闭nginx的选项,如果向nginx发送了强制关闭nginx命令,则当前进程会直接退出
  if (ngx_terminate) {
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
   ngx_worker_process_exit(cycle);
  }

  // 这里ngx_quit是优雅退出的选项。这里主要是将ngx_exiting置为1,用于表征当前进程需要退出,
  // 然后会执行如下三个工作:
  // 1. 往事件队列中添加一个事件,用于处理当前处于活跃状态的连接,将其close标志位置为1,并且执行该连接
  //  当前的处理方法,以尽快完成连接事件;
  // 2. 关闭当前cycle中监听的socket句柄;
  // 3. 将当前所有处于空闲状态的连接的close状态标记为1,然后调用其连接处理方法.
  if (ngx_quit) {
   ngx_quit = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down");
   ngx_setproctitle("worker process is shutting down");

   if (!ngx_exiting) {
    ngx_exiting = 1;
    ngx_set_shutdown_timer(cycle);
    ngx_close_listening_sockets(cycle);
    ngx_close_idle_connections(cycle);
   }
  }

  // ngx_reopen主要是重新打开nginx的所有文件,比如切换nginx的日志文件等等
  if (ngx_reopen) {
   ngx_reopen = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
   ngx_reopen_files(cycle, -1);
  }
 }
}

        可以看到,worker进程主要处理了nginx是否退出相关的标志位,还处理了nginx是否重新读取了配置文件的标志位。

感谢各位的阅读!关于“nginx中worker进程循环的实现示例”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!


新闻名称:nginx中worker进程循环的实现示例
网站网址:http://scpingwu.com/article/gsegdh.html