Skip to content
This repository was archived by the owner on May 4, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/uv-win.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
int queue_len; \
} pending_ipc_info; \
uv_write_t* non_overlapped_writes_tail; \
uv_mutex_t readfile_mutex; \
volatile HANDLE readfile_thread; \
void* reserved;

#define UV_PIPE_PRIVATE_FIELDS \
Expand Down
4 changes: 4 additions & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
/* Only used by uv_pipe_t handles. */
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x01000000
#define UV_HANDLE_PIPESERVER 0x02000000
#define UV_HANDLE_PIPE_READ_CANCELABLE 0x04000000

/* Only used by uv_tty_t handles. */
#define UV_HANDLE_TTY_READABLE 0x01000000
Expand Down Expand Up @@ -181,6 +182,9 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle,
uv_write_cb cb);
void uv__pipe_pause_read(uv_pipe_t* handle);
void uv__pipe_unpause_read(uv_pipe_t* handle);
void uv__pipe_stop_read(uv_pipe_t* handle);

void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req);
Expand Down
105 changes: 102 additions & 3 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->pending_ipc_info.queue_len = 0;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;
handle->readfile_thread = NULL;

uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);

Expand All @@ -112,6 +113,12 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->eof_timer = NULL;
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
if (pCancelSynchronousIo &&
handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
uv_mutex_init(&handle->readfile_mutex);
handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
}
}


Expand Down Expand Up @@ -321,6 +328,11 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
FILE_PIPE_LOCAL_INFORMATION pipe_info;
uv__ipc_queue_item_t* item;

if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
uv_mutex_destroy(&handle->readfile_mutex);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always destroy the mutex, since you'll always have it (see my first comment)


if ((handle->flags & UV_HANDLE_CONNECTION) &&
handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
Expand Down Expand Up @@ -658,12 +670,49 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
}


void uv__pipe_pause_read(uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
/* Pause the ReadFile task briefly, to work
around the Windows kernel bug that causes
any access to a NamedPipe to deadlock if
any process has called ReadFile */
HANDLE h;
uv_mutex_lock(&handle->readfile_mutex);
h = handle->readfile_thread;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in some uv__pipe_cancel_read? The fact that a "lock" function also does the cancel reads a bit weird.

while (h) {
/* spinlock: we expect this to finish quickly,
or we are probably about to deadlock anyways
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/anyways/anyway/ (sorry, caouldn't help it :-P)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/caouldn't/couldn't/ (sorry, couldn't help that either :P)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 08/05/2014 02:36 AM, Jameson Nash wrote:

In src/win/pipe.c:

@@ -657,12 +675,40 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
return;
}

+uv_mutex_t uv_pipe_readfile_pause(const uv_pipe_t handle) {

  • uv_mutex_t *m = handle->readfile_mutex;
  • if (m) {
  •  /\* Pause the ReadFile task briefly, to work
    
  •     around the Windows kernel bug that causes
    
  •     any access to a NamedPipe to deadlock if
    
  •     any process has called ReadFile */
    
  •  HANDLE h;
    
  •  uv_mutex_lock(m);
    
  •  h = handle->readfile_thread;
    
  •  while (h) {
    
  •    /\* spinlock: we expect this to finish quickly,
    
  •       or we are probably about to deadlock anyways
    

s/caouldn't/couldn't/ (sorry, couldn't help that either :P)

Hahaha, touche! :-)

Saúl Ibarra Corretgé
bettercallsaghul.com

(in the kernel), so it doesn't matter */
pCancelSynchronousIo(h);
SwitchToThread(); /* yield thread control briefly */
h = handle->readfile_thread;
}
}
}


void uv__pipe_unpause_read(uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
uv_mutex_unlock(&handle->readfile_mutex);
}
}


void uv__pipe_stop_read(uv_pipe_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
uv__pipe_pause_read((uv_pipe_t*)handle);
uv__pipe_unpause_read((uv_pipe_t*)handle);
}


/* Cleans up uv_pipe_t (server or connection) and all resources associated */
/* with it. */
void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;

uv__pipe_stop_read(handle);

if (handle->name) {
free(handle->name);
handle->name = NULL;
Expand All @@ -689,6 +738,7 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
CloseHandle(handle->handle);
handle->handle = INVALID_HANDLE_VALUE;
}

}


Expand Down Expand Up @@ -867,19 +917,61 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
uv_read_t* req = (uv_read_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
HANDLE hThread = NULL;
DWORD err;
uv_mutex_t *m = &handle->readfile_mutex;

assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);

if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
GetCurrentProcess(), &hThread,
0, TRUE, DUPLICATE_SAME_ACCESS)) {
handle->readfile_thread = hThread;
} else {
hThread = NULL;
}
uv_mutex_unlock(m);
}
restart_readfile:
result = ReadFile(handle->handle,
&uv_zero_,
0,
&bytes,
NULL);
if (!result) {
err = GetLastError();
if (err == ERROR_OPERATION_ABORTED &&
handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
if (handle->flags & UV_HANDLE_READING) {
/* just a brief break to do something else */
handle->readfile_thread = NULL;
/* resume after it is finished */
uv_mutex_lock(m);
handle->readfile_thread = hThread;
uv_mutex_unlock(m);
goto restart_readfile;
} else {
result = 1; /* successfully stopped reading */
}
}
}
if (hThread) {
assert(hThread == handle->readfile_thread);
/* mutex does not control clearing readfile_thread */
handle->readfile_thread = NULL;
uv_mutex_lock(m);
/* only when we hold the mutex lock is it safe to
open or close the handle */
CloseHandle(hThread);
uv_mutex_unlock(m);
}

if (!result) {
SET_REQ_ERROR(req, GetLastError());
SET_REQ_ERROR(req, err);
}

POST_COMPLETION_FOR_REQ(loop, req);
Expand Down Expand Up @@ -1836,6 +1928,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
return UV_EINVAL;
}

uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */

nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
&tmp_name_info,
Expand All @@ -1846,7 +1940,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
name_info = malloc(name_size);
if (!name_info) {
*len = 0;
return UV_ENOMEM;
err = UV_ENOMEM;
goto cleanup;
}

nt_status = pNtQueryInformationFile(handle->handle,
Expand Down Expand Up @@ -1918,10 +2013,14 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
buf[addrlen++] = '\0';
*len = addrlen;

return 0;
err = 0;
goto cleanup;

error:
free(name_info);

cleanup:
uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
return err;
}

Expand Down
6 changes: 5 additions & 1 deletion src/win/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ int uv_read_stop(uv_stream_t* handle) {
if (handle->type == UV_TTY) {
err = uv_tty_read_stop((uv_tty_t*) handle);
} else {
handle->flags &= ~UV_HANDLE_READING;
if (handle->type == UV_NAMED_PIPE) {
uv__pipe_stop_read((uv_pipe_t*) handle);
} else {
handle->flags &= ~UV_HANDLE_READING;
}
DECREASE_ACTIVE_COUNT(handle->loop, handle);
}

Expand Down
4 changes: 4 additions & 0 deletions src/win/winapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sSleepConditionVariableCS pSleepConditionVariableCS;
sSleepConditionVariableSRW pSleepConditionVariableSRW;
sWakeAllConditionVariable pWakeAllConditionVariable;
sWakeConditionVariable pWakeConditionVariable;
sCancelSynchronousIo pCancelSynchronousIo;


void uv_winapi_init() {
Expand Down Expand Up @@ -156,4 +157,7 @@ void uv_winapi_init() {

pWakeConditionVariable = (sWakeConditionVariable)
GetProcAddress(kernel32_module, "WakeConditionVariable");

pCancelSynchronousIo = (sCancelSynchronousIo)
GetProcAddress(kernel32_module, "CancelSynchronousIo");
}
3 changes: 3 additions & 0 deletions src/win/winapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -4617,6 +4617,8 @@ typedef VOID (WINAPI* sWakeAllConditionVariable)
typedef VOID (WINAPI* sWakeConditionVariable)
(PCONDITION_VARIABLE ConditionVariable);

typedef BOOL (WINAPI* sCancelSynchronousIo)
(HANDLE hThread);

/* Ntdll function pointers */
extern sRtlNtStatusToDosError pRtlNtStatusToDosError;
Expand Down Expand Up @@ -4644,5 +4646,6 @@ extern sSleepConditionVariableCS pSleepConditionVariableCS;
extern sSleepConditionVariableSRW pSleepConditionVariableSRW;
extern sWakeAllConditionVariable pWakeAllConditionVariable;
extern sWakeConditionVariable pWakeConditionVariable;
extern sCancelSynchronousIo pCancelSynchronousIo;

#endif /* UV_WIN_WINAPI_H_ */
2 changes: 2 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ TEST_DECLARE (pipe_connect_bad_name)
TEST_DECLARE (pipe_connect_to_file)
TEST_DECLARE (pipe_getsockname)
TEST_DECLARE (pipe_getsockname_abstract)
TEST_DECLARE (pipe_getsockname_blocking)
TEST_DECLARE (pipe_sendmsg)
TEST_DECLARE (pipe_server_close)
TEST_DECLARE (connection_fail)
Expand Down Expand Up @@ -395,6 +396,7 @@ TASK_LIST_START
TEST_ENTRY (pipe_listen_without_bind)
TEST_ENTRY (pipe_getsockname)
TEST_ENTRY (pipe_getsockname_abstract)
TEST_ENTRY (pipe_getsockname_blocking)
TEST_ENTRY (pipe_sendmsg)

TEST_ENTRY (connection_fail)
Expand Down
58 changes: 58 additions & 0 deletions test/test-pipe-getsockname.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#ifndef _WIN32
# include <unistd.h> /* close */
#else
# include <fcntl.h>
#endif


Expand Down Expand Up @@ -120,3 +122,59 @@ TEST_IMPL(pipe_getsockname_abstract) {
#endif
}

TEST_IMPL(pipe_getsockname_blocking) {
#ifdef _WIN32
uv_pipe_t reader;
HANDLE readh, writeh;
int readfd;
char buf1[1024], buf2[1024];
size_t len1, len2;
int r;

r = CreatePipe(&readh, &writeh, NULL, 65536);
ASSERT(r != 0);

r = uv_pipe_init(uv_default_loop(), &reader, 0);
ASSERT(r == 0);
readfd = _open_osfhandle((intptr_t)readh, _O_RDONLY);
ASSERT(r != -1);
r = uv_pipe_open(&reader, readfd);
ASSERT(r == 0);
r = uv_read_start((uv_stream_t*)&reader, NULL, NULL);
ASSERT(r == 0);
Sleep(100);
r = uv_read_stop((uv_stream_t*)&reader);
ASSERT(r == 0);

len1 = sizeof buf1;
r = uv_pipe_getsockname(&reader, buf1, &len1);
ASSERT(r == 0);

r = uv_read_start((uv_stream_t*)&reader, NULL, NULL);
ASSERT(r == 0);
Sleep(100);

len2 = sizeof buf2;
r = uv_pipe_getsockname(&reader, buf2, &len2);
ASSERT(r == 0);

r = uv_read_stop((uv_stream_t*)&reader);
ASSERT(r == 0);

ASSERT(len1 == len2);
ASSERT(memcmp(buf1, buf2, len1) == 0);

close_cb_called = 0;
uv_close((uv_handle_t*)&reader, close_cb);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

ASSERT(close_cb_called == 1);

_close(readfd);
CloseHandle(writeh);
#endif

MAKE_VALGRIND_HAPPY();
return 0;
}