lua-users home
lua-l archive

[Date Prev][Date Next][Thread Prev][Thread Next] [Date Index] [Thread Index]


On Thu, Dec 11, 2014 at 06:11:00PM -0500, Sean Conner wrote:
> It was thus said that the Great William Ahern once stated:
> > On Thu, Dec 11, 2014 at 12:31:28PM -0500, Sean Conner wrote:
> > > It was thus said that the Great Valerio Schiavoni once stated:
> > > > 
> > > > Due to continuous 'timeout' errors returned by the receiving socket (which
> > > > is set to nonblocking, socket:settimeout(0)), the coroutine continuosly
> > > > yields: in this case, is the reception of the data somehow put 'on hold' ?
> > > 
> > >   You don't need to set the socket to non-blocking if you use select just
> > > for reading (and I just checked our codebase at work---I don't set any of
> > > the sockets to nonblocking).  All select() does is indicate when a file
> > > descriptor (in your case, a network socket) has received data (ready for
> > > reading), can write data without blocking (ready for writing [1]) or an
> > > error happened.
> > 
> > This is not true at all, and people who assume this are running code with
> > TOCTTOU bugs. Spurious wakeups can be very common. There are two common
> > scenarios.
> > 
> > 1) Two threads reading from the same socket. Both get woken up, one drains
> > the socket, the other ends up stalled. This is easy to avoid, but proves the
> > point about the semantics of readiness notification. It applies equally to
> > writability as to readability.
> 
>   No true for Linux any more (this is the "thundering herd" problem).  Now,
> only one thread/process is woken up---the one that actually received the
> packet.

The thundering herd problem still exists except under very narrow
conditions. In particular it's only fixed where at least one thread is
already blocking on a read or accept call (i.e. not polling). Only then can
the kernel atomically clear the ready state and dequeue the data.

The kernel must ensure that the event was consumed before it can clear the
ready state. Otherwise you've simply introduced a ton of very ugly race
conditions and violated the semantics of level-triggered readiness
signaling. Imagine if you had two threads each polling on a socket. The
kernel only chose to notify one thread (via epoll, poll, or select). Except,
before the thread can process the notification, it first processes some
other notification which causes it to become CPU bound scanning a large
file, or worse exit. Then the event is lost temporarily or even permanently,
causing latency and stalling.

There's no code in the kernel that says, "hey, we notified this thread 200ms
ago about a socket event but it never cleared the ready state, so let's
notify another thread." To prove my point, I've attached a sample program
which writes a character to a shared socket. Multiple threads will then poll
on the socket using epoll and EPOLLIN. You can tweak the behavior using the
following command-line switches:

  -s  threads share the same epoll queue

  -e  threads apply EPOLLET. notice that you get different behavior
      depending on whether the threads share the same epoll queue. that's
      because only 1 thread can enter the epoll_wait loop in the kernel
      for any single epoll queue.

  -o  threads apply EPOLLONESHOT. you see similar behavior to EPOLLET.

  -n  sets O_NONBLOCK on the socket. shouldn't see any difference.

The only recent development in the past decade regarding thundering herd was
the addition of SO_REUSEPORT in kernel 3.19, which adopts the BSD identifier
but has incompatible semantics. See https://lwn.net/Articles/542629/

With SO_REUSEPORT, the kernel will direct an incoming TCP connection to a
particular socket handle. But it's has nothing to do with threads. For the
solution to work each thread must derive an independent socket file handle
by binding to the port separately. Then for each incoming connection it's
assigned to a single file handle. If multiple threads were polling on the
same socket file handle, they'd both be woken.

It you follow the comments on the LKML regarding the SO_REUSEPORT patch, the
the context in which the thundering herd problem is mitigated becomes clear.

> > Another poor assumption people make is thinking they don't need to set UDP
> > sockets to non-blocking when writing, or even bother with polling for write
> > readiness. Because UDP is lossy they assume that the kernel will simply drop
> > any packet that won't fit in the output queue, immediately returning control
> > to the writing thread. Not true, at least not on Linux. OTOH, Linux as a
> > default socket buffer size of 65536 which means, again, you only trigger
> > this bug under very heavy UDP load.
> 
>   I'm not sure what distribution you are using, but I'm still running a
> 2.6.9 kernel (32b) at home, and it reports a default UDP buffer size of
> 110592; at work I'm running Linux 3.2.0 (64b) and it reports a default UDP
> buffer size of 229376.  
> 

Indeed. So for you it's 2x or 4x what I reported. On the most recent server
I helped somebody fix this problem (a Java cluster doing billions of UDP
requests daily), it was 64k, but perhaps that was something they set
themselves.

But you're only proving my point. The point was that the huge buffers common
on Linux mask the problem except under heavy load. It's not until you begin
to saturate the network and fill the buffer that your threads begin to stall
when writing a UDP packet unless you set O_NONBLOCK.

#define _GNU_SOURCE
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <err.h>

#if __STDC_VERSION__ < 199901L
#pragma GCC warning "Should compile as C99 or higher"
#endif

#pragma GCC diagnostic warning "-Wall"
#pragma GCC diagnostic warning "-Wextra"

#define countof(a) (sizeof (a) / sizeof *(a))
#define err(...) ({ flockfile(stderr); err(__VA_ARGS__); funlockfile(stderr); })
#define errxx(...) ({ flockfile(stderr); errx(__VA_ARGS__); funlockfile(stderr); })
#define warn(...) ({ flockfile(stderr); warn(__VA_ARGS__); funlockfile(stderr); })
#define warnx(...) ({ flockfile(stderr); warnx(__VA_ARGS__); funlockfile(stderr); })

struct {
	int epollfd;
	int events;
	pthread_mutex_t mux;
	pthread_cond_t cv;
	size_t ready;
} Main = {
	.epollfd = -1,
	.events = EPOLLIN,
	.mux = PTHREAD_MUTEX_INITIALIZER,
	.cv = PTHREAD_COND_INITIALIZER,
};

static void *dopoll(void *arg) {
	int fd = (int)(intptr_t)arg, epfd, tid, error;
	struct epoll_event event;
	struct timeval deadline, now;

	if (-1 == (epfd = Main.epollfd) && -1 == (epfd = epoll_create1(0)))
		err(1, "epoll_create");

	if (0 != epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &(struct epoll_event){ Main.events, { 0 } })) {
		if (errno != EEXIST) /* get EEXIST if threads sharing epoll queue */
			err(1, "epoll_ctl");
	}

	if ((error = pthread_mutex_lock(&Main.mux)))
		errx(1, "pthread_mutex_lock: %s", strerror(error));
	tid = ++Main.ready;
	if ((error = pthread_mutex_unlock(&Main.mux)))
		errx(1, "pthread_mutex_unlock: %s", strerror(error));
	if ((error = pthread_cond_signal(&Main.cv)))
		errx(1, "pthread_cond_signal: %s", strerror(error));

	warnx("thread %d polling (fd:%d epfd:%d)", tid, fd, epfd);

	switch (epoll_wait(epfd, &event, 1, 5000)) {
	case 0:
		errno = ETIMEDOUT;
		/* FALL THROUGH */
	case -1:
		err(1, "thread %d: epoll_wait", tid);
		break;
	default:
		gettimeofday(&now, NULL);
		warnx("thread %d awoke at %ld.%ld", tid, now.tv_sec, now.tv_usec);
		break;
	}

	gettimeofday(&deadline, NULL);
	deadline.tv_sec += 5;
	do {
		gettimeofday(&now, NULL);
	} while (timercmp(&deadline, &now, >));

	warnx("thread %d exiting", tid);

	return NULL;
} /* dopoll() */

int main(int argc, char *argv[]) {
	pthread_t thread[3];
	int fd[2], flags, optc, error;

	if (0 != socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fd))
		err(1, "socketpair");

	while (-1 != (optc = getopt(argc, argv, "seon"))) {
		switch (optc) {
		case 's':
			warnx("using shared epoll queue");
			if (-1 == (Main.epollfd = epoll_create1(0)))
				err(1, "epoll_create");
			break;
		case 'e':
			warnx("setting EPOLLET");
			Main.events |= EPOLLET;
			break;
		case 'o':
			warnx("setting EPOLLONESHOT");
			Main.events |= EPOLLONESHOT;
			break;
		case 'n':
			warnx("setting O_NONBLOCK on read fd");
			if (-1 == (flags = fcntl(fd[0], F_GETFL)))
				err(1, "fcntl");
			if (0 != fcntl(fd[0], F_SETFL, flags|O_NONBLOCK))
				err(1, "fcntl");
			break;
		}
	}

	for (size_t i = 0; i < countof(thread); i++) {
		if ((error = pthread_create(&thread[i], NULL, &dopoll, (void *)(intptr_t)fd[0])))
			errx(1, "pthread_create: %s", strerror(error));
	}

	sleep(1); /* give time for threads to enter quiescent state */
	warnx("doing socket write");
	write(fd[1], "A", 1);

	if ((error = pthread_mutex_lock(&Main.mux)))
		errx(1, "pthread_mutex_lock: %s", strerror(error));

	while (Main.ready < countof(thread)) {
		if ((error = pthread_cond_wait(&Main.cv, &Main.mux)))
			errx(1, "pthread_cond_wait: %s", strerror(error));
	}

	if ((error = pthread_mutex_unlock(&Main.mux)))
		errx(1, "pthread_mutex_unlock: %s", strerror(error));

	for (size_t i = 0; i < countof(thread); i++) {
		if ((error = pthread_join(thread[i], NULL)))
			errx(1, "pthread_join: %s", strerror(error));
	}

	return 0;
}