Skip to content

Commit

Permalink
win,pipe: restore compatibility with the old IPC framing protocol
Browse files Browse the repository at this point in the history
Fixes: libuv#1922
Refs: nodejs/node#21671
PR-URL: libuv#1923
Reviewed-By: Bartosz Sosnowski <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
  • Loading branch information
piscisaureus committed Jul 21, 2018
1 parent 31a06f2 commit 27ba662
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 74 deletions.
12 changes: 10 additions & 2 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
* TCP
*/

typedef enum {
UV__IPC_SOCKET_XFER_NONE = 0,
UV__IPC_SOCKET_XFER_TCP_CONNECTION,
UV__IPC_SOCKET_XFER_TCP_SERVER
} uv__ipc_socket_xfer_type_t;

typedef struct {
WSAPROTOCOL_INFOW socket_info;
uint32_t delayed_error;
uint32_t flags; /* Either zero or UV_HANDLE_CONNECTION. */
} uv__ipc_socket_xfer_info_t;

int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
Expand All @@ -89,8 +94,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);

int uv__tcp_xfer_export(uv_tcp_t* handle,
int pid,
uv__ipc_socket_xfer_type_t* xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info);
int uv__tcp_xfer_import(uv_tcp_t* tcp,
uv__ipc_socket_xfer_type_t xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info);
int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info);


/*
Expand Down
172 changes: 113 additions & 59 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
#include <stdlib.h>
#include <string.h>

#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
#include "stream-inl.h"
#include "internal.h"
#include "req-inl.h"
#include "stream-inl.h"
#include "uv-common.h"
#include "uv.h"

#include <aclapi.h>
#include <accctrl.h>
Expand All @@ -52,19 +53,36 @@ static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;

/* IPC incoming xfer queue item. */
typedef struct {
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;
QUEUE member;
} uv__ipc_xfer_queue_item_t;

/* IPC frame types. */
enum { UV__IPC_DATA_FRAME = 0, UV__IPC_XFER_FRAME = 1 };
/* IPC frame header flags. */
/* clang-format off */
enum {
UV__IPC_FRAME_HAS_DATA = 0x01,
UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
/* These are combinations of the flags above. */
UV__IPC_FRAME_XFER_FLAGS = 0x06,
UV__IPC_FRAME_VALID_FLAGS = 0x07
};
/* clang-format on */

/* IPC frame header. */
typedef struct {
uint32_t type;
uint32_t payload_length;
uint32_t flags;
uint32_t reserved1; /* Ignored. */
uint32_t data_length; /* Must be zero if there is no data. */
uint32_t reserved2; /* Must be zero. */
} uv__ipc_frame_header_t;

/* To implement the IPC protocol correctly, these structures must have exactly
* the right size. */
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);

/* Coalesced write request. */
typedef struct {
uv_write_t req; /* Internal heap-allocated write request. */
Expand Down Expand Up @@ -878,7 +896,8 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
server->pipe.conn.ipc_xfer_queue_length--;
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);

err = uv__tcp_xfer_import((uv_tcp_t*) client, &item->xfer_info);
err = uv__tcp_xfer_import(
(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
if (err != 0)
return err;

Expand Down Expand Up @@ -1458,10 +1477,10 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
uv_buf_t stack_bufs[6];
uv_buf_t* bufs;
size_t buf_count, buf_index;
uv__ipc_frame_header_t xfer_frame_header;
uv__ipc_frame_header_t frame_header;
uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
uv__ipc_socket_xfer_info_t xfer_info;
uv__ipc_frame_header_t data_frame_header;
size_t data_length;
uint64_t data_length;
size_t i;
int err;

Expand All @@ -1472,29 +1491,27 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
if (data_length > UINT32_MAX)
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */

/* Prepare xfer frame payload. */
if (send_handle) {
/* Prepare the frame's socket xfer payload. */
if (send_handle != NULL) {
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;

/* Verify that `send_handle` it is indeed a tcp handle. */
if (send_tcp_handle->type != UV_TCP)
return ERROR_NOT_SUPPORTED;

/* Export the tcp handle. */
err = uv__tcp_xfer_export(
send_tcp_handle, uv__pipe_get_ipc_remote_pid(handle), &xfer_info);
err = uv__tcp_xfer_export(send_tcp_handle,
uv__pipe_get_ipc_remote_pid(handle),
&xfer_type,
&xfer_info);
if (err != 0)
return err;
}

/* Compute the number of uv_buf_t's required. */
buf_count = 0;
if (send_handle != NULL) {
buf_count += 2; /* One for the frame header, one for the payload. */
}
if (data_buf_count > 0) {
buf_count += 1 + data_buf_count; /* One extra for the frame header. */
}
buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
if (send_handle != NULL)
buf_count += 1; /* One extra for the socket xfer information. */

/* Use the on-stack buffer array if it is big enough; otherwise allocate
* space for it on the heap. */
Expand All @@ -1509,25 +1526,32 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
}
buf_index = 0;

if (send_handle != NULL) {
/* Add xfer frame header. */
xfer_frame_header.type = UV__IPC_XFER_FRAME;
xfer_frame_header.payload_length = sizeof xfer_info;
bufs[buf_index++] =
uv_buf_init((char*) &xfer_frame_header, sizeof xfer_frame_header);
/* Initialize frame header and add it to the buffers list. */
memset(&frame_header, 0, sizeof frame_header);
bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);

/* Add xfer frame payload. */
if (send_handle != NULL) {
/* Add frame header flags. */
switch (xfer_type) {
case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
break;
case UV__IPC_SOCKET_XFER_TCP_SERVER:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
break;
default:
assert(0); // Unreachable.
}
/* Add xfer info buffer. */
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
}

if (data_length > 0) {
/* Add data frame header. */
data_frame_header.type = UV__IPC_DATA_FRAME;
data_frame_header.payload_length = (uint32_t) data_length;
bufs[buf_index++] =
uv_buf_init((char*) &data_frame_header, sizeof data_frame_header);

/* Add data buffers. */
/* Update frame header. */
frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
frame_header.data_length = (uint32_t) data_length;
/* Add data buffers to buffers list. */
for (i = 0; i < data_buf_count; i++)
bufs[buf_index++] = data_bufs[i];
}
Expand Down Expand Up @@ -1601,14 +1625,18 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,


static void uv__pipe_queue_ipc_xfer_info(
uv_pipe_t* handle, uv__ipc_socket_xfer_info_t* xfer_info) {
uv_pipe_t* handle,
uv__ipc_socket_xfer_type_t xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
uv__ipc_xfer_queue_item_t* item;

item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
if (item == NULL)
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");

memcpy(&item->xfer_info, xfer_info, sizeof(item->xfer_info));
item->xfer_type = xfer_type;
item->xfer_info = *xfer_info;

QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
handle->pipe.conn.ipc_xfer_queue_length++;
}
Expand Down Expand Up @@ -1678,7 +1706,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
int err;

if (*data_remaining > 0) {
/* Read data frame payload. */
/* Read frame data payload. */
DWORD bytes_read =
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
*data_remaining -= bytes_read;
Expand All @@ -1687,6 +1715,8 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
} else {
/* Start of a new IPC frame. */
uv__ipc_frame_header_t frame_header;
uint32_t xfer_flags;
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;

/* Read the IPC frame header. */
Expand All @@ -1695,33 +1725,57 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
if (err)
goto error;

if (frame_header.type == UV__IPC_DATA_FRAME) {
/* Data frame: capture payload length. Actual data will be read in
* subsequent call to uv__pipe_read_ipc(). */
*data_remaining = frame_header.payload_length;
/* Validate that flags are valid. */
if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
goto invalid;
/* Validate that reserved2 is zero. */
if (frame_header.reserved2 != 0)
goto invalid;

/* Parse xfer flags. */
xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
/* Socket coming -- determine the type. */
xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
? UV__IPC_SOCKET_XFER_TCP_CONNECTION
: UV__IPC_SOCKET_XFER_TCP_SERVER;
} else if (xfer_flags == 0) {
/* No socket. */
xfer_type = UV__IPC_SOCKET_XFER_NONE;
} else {
/* Invalid flags. */
goto invalid;
}

/* Return number of bytes read. */
return sizeof frame_header;
/* Parse data frame information. */
if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
*data_remaining = frame_header.data_length;
} else if (frame_header.data_length != 0) {
/* Data length greater than zero but data flag not set -- invalid. */
goto invalid;
}

} else if (frame_header.type == UV__IPC_XFER_FRAME) {
/* Xfer frame: read the payload. */
assert(frame_header.payload_length == sizeof xfer_info);
err =
uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;
/* If no socket xfer info follows, return here. Data will be read in a
* subsequent invocation of uv__pipe_read_ipc(). */
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
return sizeof frame_header; /* Number of bytes read. */

/* Store the pending socket info. */
uv__pipe_queue_ipc_xfer_info(handle, &xfer_info);
/* Read transferred socket information. */
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;

/* Return number of bytes read. */
return sizeof frame_header + sizeof xfer_info;
}
/* Store the pending socket info. */
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);

/* Invalid frame. */
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
/* Return number of bytes read. */
return sizeof frame_header + sizeof xfer_info;
}

invalid:
/* Invalid frame. */
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */

error:
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
return 0; /* Break out of read loop. */
Expand Down
34 changes: 21 additions & 13 deletions src/win/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,12 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,

int uv__tcp_xfer_export(uv_tcp_t* handle,
int target_pid,
uv__ipc_socket_xfer_type_t* xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
if (handle->flags & UV_HANDLE_CONNECTION) {
*xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
} else {
*xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
/* We're about to share the socket with another process. Because this is a
* listening socket, we assume that the other process will be accepting
* connections on it. Thus, before sharing the socket with another process,
Expand All @@ -1208,12 +1212,9 @@ int uv__tcp_xfer_export(uv_tcp_t* handle,
}
}

if (WSADuplicateSocketW(
handle->socket, target_pid, &xfer_info->socket_info)) {
if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
return WSAGetLastError();
}
xfer_info->delayed_error = handle->delayed_error;
xfer_info->flags = handle->flags & UV_HANDLE_CONNECTION;

/* Mark the local copy of the handle as 'shared' so we behave in a way that's
* friendly to the process(es) that we share the socket with. */
Expand All @@ -1223,14 +1224,21 @@ int uv__tcp_xfer_export(uv_tcp_t* handle,
}


int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) {
int uv__tcp_xfer_import(uv_tcp_t* tcp,
uv__ipc_socket_xfer_type_t xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
int err;
SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
&xfer_info->socket_info,
0,
WSA_FLAG_OVERLAPPED);
SOCKET socket;

assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);

socket = WSASocketW(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
&xfer_info->socket_info,
0,
WSA_FLAG_OVERLAPPED);

if (socket == INVALID_SOCKET) {
return WSAGetLastError();
Expand All @@ -1246,7 +1254,7 @@ int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) {
tcp->delayed_error = xfer_info->delayed_error;
tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;

if (xfer_info->flags & UV_HANDLE_CONNECTION) {
if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
uv_connection_init((uv_stream_t*)tcp);
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
Expand Down

0 comments on commit 27ba662

Please sign in to comment.