Use QIOChannel, QIOChannelSocket and QIONetListener.
This allows net/stream to use all the available parameters provided by
SocketAddress.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
Acked-by: Michael S. Tsirkin <mst@redhat.com>
---
net/stream.c | 492 +++++++++++++++++-------------------------------
qemu-options.hx | 4 +-
2 files changed, 178 insertions(+), 318 deletions(-)
diff --git a/net/stream.c b/net/stream.c
index 884f473018da..54c67e14d240 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -35,48 +35,36 @@
#include "qemu/iov.h"
#include "qemu/main-loop.h"
#include "qemu/cutils.h"
+#include "io/channel.h"
+#include "io/channel-socket.h"
+#include "io/net-listener.h"
typedef struct NetStreamState {
NetClientState nc;
- int listen_fd;
- int fd;
+ QIOChannel *listen_ioc;
+ QIONetListener *listener;
+ QIOChannel *ioc;
+ guint ioc_read_tag;
+ guint ioc_write_tag;
SocketReadState rs;
unsigned int send_index; /* number of bytes sent*/
- bool read_poll; /* waiting to receive data? */
- bool write_poll; /* waiting to transmit data? */
} NetStreamState;
-static void net_stream_send(void *opaque);
-static void net_stream_accept(void *opaque);
-static void net_stream_writable(void *opaque);
+static void net_stream_listen(QIONetListener *listener,
+ QIOChannelSocket *cioc,
+ void *opaque);
-static void net_stream_update_fd_handler(NetStreamState *s)
+static gboolean net_stream_writable(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer data)
{
- qemu_set_fd_handler(s->fd,
- s->read_poll ? net_stream_send : NULL,
- s->write_poll ? net_stream_writable : NULL,
- s);
-}
-
-static void net_stream_read_poll(NetStreamState *s, bool enable)
-{
- s->read_poll = enable;
- net_stream_update_fd_handler(s);
-}
-
-static void net_stream_write_poll(NetStreamState *s, bool enable)
-{
- s->write_poll = enable;
- net_stream_update_fd_handler(s);
-}
-
-static void net_stream_writable(void *opaque)
-{
- NetStreamState *s = opaque;
+ NetStreamState *s = data;
- net_stream_write_poll(s, false);
+ s->ioc_write_tag = 0;
qemu_flush_queued_packets(&s->nc);
+
+ return G_SOURCE_REMOVE;
}
static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
@@ -93,13 +81,15 @@ static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
.iov_len = size,
},
};
+ struct iovec local_iov[2];
+ unsigned int nlocal_iov;
size_t remaining;
ssize_t ret;
remaining = iov_size(iov, 2) - s->send_index;
- ret = iov_send(s->fd, iov, 2, s->send_index, remaining);
-
- if (ret == -1 && errno == EAGAIN) {
+ nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
+ ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
ret = 0; /* handled further down */
}
if (ret == -1) {
@@ -108,19 +98,25 @@ static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
}
if (ret < (ssize_t)remaining) {
s->send_index += ret;
- net_stream_write_poll(s, true);
+ s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
+ net_stream_writable, s, NULL);
return 0;
}
s->send_index = 0;
return size;
}
+static gboolean net_stream_send(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer data);
+
static void net_stream_send_completed(NetClientState *nc, ssize_t len)
{
NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
- if (!s->read_poll) {
- net_stream_read_poll(s, true);
+ if (!s->ioc_read_tag) {
+ s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
+ net_stream_send, s, NULL);
}
}
@@ -131,19 +127,24 @@ static void net_stream_rs_finalize(SocketReadState *rs)
if (qemu_send_packet_async(&s->nc, rs->buf,
rs->packet_len,
net_stream_send_completed) == 0) {
- net_stream_read_poll(s, false);
+ if (s->ioc_read_tag) {
+ g_source_remove(s->ioc_read_tag);
+ s->ioc_read_tag = 0;
+ }
}
}
-static void net_stream_send(void *opaque)
+static gboolean net_stream_send(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer data)
{
- NetStreamState *s = opaque;
+ NetStreamState *s = data;
int size;
int ret;
- uint8_t buf1[NET_BUFSIZE];
- const uint8_t *buf;
+ char buf1[NET_BUFSIZE];
+ const char *buf;
- size = recv(s->fd, buf1, sizeof(buf1), 0);
+ size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
if (size < 0) {
if (errno != EWOULDBLOCK) {
goto eoc;
@@ -151,51 +152,63 @@ static void net_stream_send(void *opaque)
} else if (size == 0) {
/* end of connection */
eoc:
- net_stream_read_poll(s, false);
- net_stream_write_poll(s, false);
- if (s->listen_fd != -1) {
- qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
+ s->ioc_read_tag = 0;
+ if (s->ioc_write_tag) {
+ g_source_remove(s->ioc_write_tag);
+ s->ioc_write_tag = 0;
}
- closesocket(s->fd);
+ if (s->listener) {
+ qio_net_listener_set_client_func(s->listener, net_stream_listen,
+ s, NULL);
+ }
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
- s->fd = -1;
net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
s->nc.link_down = true;
qemu_set_info_str(&s->nc, "");
- return;
+ return G_SOURCE_REMOVE;
}
buf = buf1;
- ret = net_fill_rstate(&s->rs, buf, size);
+ ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
if (ret == -1) {
goto eoc;
}
+
+ return G_SOURCE_CONTINUE;
}
static void net_stream_cleanup(NetClientState *nc)
{
NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
- if (s->fd != -1) {
- net_stream_read_poll(s, false);
- net_stream_write_poll(s, false);
- close(s->fd);
- s->fd = -1;
+ if (s->ioc) {
+ if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
+ if (s->ioc_read_tag) {
+ g_source_remove(s->ioc_read_tag);
+ s->ioc_read_tag = 0;
+ }
+ if (s->ioc_write_tag) {
+ g_source_remove(s->ioc_write_tag);
+ s->ioc_write_tag = 0;
+ }
+ }
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
}
- if (s->listen_fd != -1) {
- qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
- closesocket(s->listen_fd);
- s->listen_fd = -1;
+ if (s->listen_ioc) {
+ if (s->listener) {
+ qio_net_listener_disconnect(s->listener);
+ object_unref(OBJECT(s->listener));
+ s->listener = NULL;
+ }
+ object_unref(OBJECT(s->listen_ioc));
+ s->listen_ioc = NULL;
}
}
-static void net_stream_connect(void *opaque)
-{
- NetStreamState *s = opaque;
- net_stream_read_poll(s, true);
-}
-
static NetClientInfo net_stream_info = {
.type = NET_CLIENT_DRIVER_STREAM,
.size = sizeof(NetStreamState),
@@ -203,76 +216,66 @@ static NetClientInfo net_stream_info = {
.cleanup = net_stream_cleanup,
};
-static NetStreamState *net_stream_fd_init(NetClientState *peer,
- const char *model,
- const char *name,
- int fd, int is_connected)
+static void net_stream_listen(QIONetListener *listener,
+ QIOChannelSocket *cioc,
+ void *opaque)
{
- NetClientState *nc;
- NetStreamState *s;
-
- nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+ NetStreamState *s = opaque;
+ SocketAddress *addr;
+ char *uri;
- qemu_set_info_str(nc, "fd=%d", fd);
+ object_ref(OBJECT(cioc));
- s = DO_UPCAST(NetStreamState, nc, nc);
+ qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
- s->fd = fd;
- s->listen_fd = -1;
- net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
+ s->ioc = QIO_CHANNEL(cioc);
+ qio_channel_set_name(s->ioc, "stream-server");
+ s->nc.link_down = false;
- /* Disable Nagle algorithm on TCP sockets to reduce latency */
- socket_set_nodelay(fd);
+ s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
+ s, NULL);
- if (is_connected) {
- net_stream_connect(s);
+ if (cioc->localAddr.ss_family == AF_UNIX) {
+ addr = qio_channel_socket_get_local_address(cioc, NULL);
} else {
- qemu_set_fd_handler(s->fd, NULL, net_stream_connect, s);
+ addr = qio_channel_socket_get_remote_address(cioc, NULL);
}
- return s;
+ g_assert(addr != NULL);
+ uri = socket_uri(addr);
+ qemu_set_info_str(&s->nc, uri);
+ g_free(uri);
+ qapi_free_SocketAddress(addr);
}
-static void net_stream_accept(void *opaque)
+static void net_stream_server_listening(QIOTask *task, gpointer opaque)
{
NetStreamState *s = opaque;
- struct sockaddr_storage saddr;
- socklen_t len;
- int fd;
-
- for (;;) {
- len = sizeof(saddr);
- fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
- if (fd < 0 && errno != EINTR) {
- return;
- } else if (fd >= 0) {
- qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
- break;
- }
- }
+ QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
+ SocketAddress *addr;
+ int ret;
- s->fd = fd;
- s->nc.link_down = false;
- net_stream_connect(s);
- switch (saddr.ss_family) {
- case AF_INET: {
- struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
-
- qemu_set_info_str(&s->nc, "connection from %s:%d",
- inet_ntoa(saddr_in->sin_addr),
- ntohs(saddr_in->sin_port));
- break;
+ if (listen_sioc->fd < 0) {
+ qemu_set_info_str(&s->nc, "connection error");
+ return;
}
- case AF_UNIX: {
- struct sockaddr_un saddr_un;
- len = sizeof(saddr_un);
- getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
- qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
- break;
- }
- default:
- g_assert_not_reached();
+ addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
+ g_assert(addr != NULL);
+ ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
+ if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+ qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+ addr->u.fd.str, -ret);
+ return;
}
+ g_assert(ret == 0);
+ qapi_free_SocketAddress(addr);
+
+ s->nc.link_down = true;
+ s->listener = qio_net_listener_new();
+
+ net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
+ qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
+ qio_net_listener_add(s->listener, listen_sioc);
}
static int net_stream_server_init(NetClientState *peer,
@@ -283,105 +286,61 @@ static int net_stream_server_init(NetClientState *peer,
{
NetClientState *nc;
NetStreamState *s;
- int fd, ret;
+ QIOChannelSocket *listen_sioc = qio_channel_socket_new();
- switch (addr->type) {
- case SOCKET_ADDRESS_TYPE_INET: {
- struct sockaddr_in saddr_in;
-
- if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
- errp) < 0) {
- return -1;
- }
-
- fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- error_setg_errno(errp, errno, "can't create stream socket");
- return -1;
- }
- qemu_socket_set_nonblock(fd);
+ nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+ s = DO_UPCAST(NetStreamState, nc, nc);
- socket_set_fast_reuse(fd);
+ s->listen_ioc = QIO_CHANNEL(listen_sioc);
+ qio_channel_socket_listen_async(listen_sioc, addr, 0,
+ net_stream_server_listening, s,
+ NULL, NULL);
- ret = bind(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
- if (ret < 0) {
- error_setg_errno(errp, errno, "can't bind ip=%s to socket",
- inet_ntoa(saddr_in.sin_addr));
- closesocket(fd);
- return -1;
- }
- break;
- }
- case SOCKET_ADDRESS_TYPE_UNIX: {
- struct sockaddr_un saddr_un;
-
- ret = unlink(addr->u.q_unix.path);
- if (ret < 0 && errno != ENOENT) {
- error_setg_errno(errp, errno, "failed to unlink socket %s",
- addr->u.q_unix.path);
- return -1;
- }
+ return 0;
+}
- saddr_un.sun_family = PF_UNIX;
- ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
- addr->u.q_unix.path);
- if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
- error_setg(errp, "UNIX socket path '%s' is too long",
- addr->u.q_unix.path);
- error_append_hint(errp, "Path must be less than %zu bytes\n",
- sizeof(saddr_un.sun_path));
- return -1;
- }
+static void net_stream_client_connected(QIOTask *task, gpointer opaque)
+{
+ NetStreamState *s = opaque;
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
+ SocketAddress *addr;
+ gchar *uri;
+ int ret;
- fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
- if (fd < 0) {
- error_setg_errno(errp, errno, "can't create stream socket");
- return -1;
- }
- qemu_socket_set_nonblock(fd);
-
- ret = bind(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
- if (ret < 0) {
- error_setg_errno(errp, errno, "can't create socket with path: %s",
- saddr_un.sun_path);
- closesocket(fd);
- return -1;
- }
- break;
- }
- case SOCKET_ADDRESS_TYPE_FD:
- fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
- if (fd == -1) {
- return -1;
- }
- ret = qemu_socket_try_set_nonblock(fd);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
- name, fd);
- return -1;
- }
- break;
- default:
- error_setg(errp, "only support inet or fd type");
- return -1;
+ if (sioc->fd < 0) {
+ qemu_set_info_str(&s->nc, "connection error");
+ goto error;
}
- ret = listen(fd, 0);
- if (ret < 0) {
- error_setg_errno(errp, errno, "can't listen on socket");
- closesocket(fd);
- return -1;
+ addr = qio_channel_socket_get_remote_address(sioc, NULL);
+ g_assert(addr != NULL);
+ uri = socket_uri(addr);
+ qemu_set_info_str(&s->nc, uri);
+ g_free(uri);
+
+ ret = qemu_socket_try_set_nonblock(sioc->fd);
+ if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+ qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+ addr->u.fd.str, -ret);
+ qapi_free_SocketAddress(addr);
+ goto error;
}
+ g_assert(ret == 0);
- nc = qemu_new_net_client(&net_stream_info, peer, model, name);
- s = DO_UPCAST(NetStreamState, nc, nc);
- s->fd = -1;
- s->listen_fd = fd;
- s->nc.link_down = true;
net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
- qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
- return 0;
+ /* Disable Nagle algorithm on TCP sockets to reduce latency */
+ qio_channel_set_delay(s->ioc, false);
+
+ s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
+ s, NULL);
+ s->nc.link_down = false;
+ qapi_free_SocketAddress(addr);
+
+ return;
+error:
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
}
static int net_stream_client_init(NetClientState *peer,
@@ -391,118 +350,19 @@ static int net_stream_client_init(NetClientState *peer,
Error **errp)
{
NetStreamState *s;
- struct sockaddr_in saddr_in;
- struct sockaddr_un saddr_un;
- int fd, connected, ret;
-
- switch (addr->type) {
- case SOCKET_ADDRESS_TYPE_INET:
- if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
- errp) < 0) {
- return -1;
- }
+ NetClientState *nc;
+ QIOChannelSocket *sioc = qio_channel_socket_new();
- fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- error_setg_errno(errp, errno, "can't create stream socket");
- return -1;
- }
- qemu_socket_set_nonblock(fd);
-
- connected = 0;
- for (;;) {
- ret = connect(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
- if (ret < 0) {
- if (errno == EINTR || errno == EWOULDBLOCK) {
- /* continue */
- } else if (errno == EINPROGRESS ||
- errno == EALREADY) {
- break;
- } else {
- error_setg_errno(errp, errno, "can't connect socket");
- closesocket(fd);
- return -1;
- }
- } else {
- connected = 1;
- break;
- }
- }
- break;
- case SOCKET_ADDRESS_TYPE_UNIX:
- saddr_un.sun_family = PF_UNIX;
- ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
- addr->u.q_unix.path);
- if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
- error_setg(errp, "UNIX socket path '%s' is too long",
- addr->u.q_unix.path);
- error_append_hint(errp, "Path must be less than %zu bytes\n",
- sizeof(saddr_un.sun_path));
- return -1;
- }
+ nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+ s = DO_UPCAST(NetStreamState, nc, nc);
- fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
- if (fd < 0) {
- error_setg_errno(errp, errno, "can't create stream socket");
- return -1;
- }
- qemu_socket_set_nonblock(fd);
-
- connected = 0;
- for (;;) {
- ret = connect(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
- if (ret < 0) {
- if (errno == EINTR || errno == EWOULDBLOCK) {
- /* continue */
- } else if (errno == EAGAIN ||
- errno == EALREADY) {
- break;
- } else {
- error_setg_errno(errp, errno, "can't connect socket");
- closesocket(fd);
- return -1;
- }
- } else {
- connected = 1;
- break;
- }
- }
- break;
- case SOCKET_ADDRESS_TYPE_FD:
- fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
- if (fd == -1) {
- return -1;
- }
- ret = qemu_socket_try_set_nonblock(fd);
- if (ret < 0) {
- error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
- name, fd);
- return -1;
- }
- connected = 1;
- break;
- default:
- error_setg(errp, "only support inet, unix or fd type");
- return -1;
- }
+ s->ioc = QIO_CHANNEL(sioc);
+ s->nc.link_down = true;
+
+ qio_channel_socket_connect_async(sioc, addr,
+ net_stream_client_connected, s,
+ NULL, NULL);
- s = net_stream_fd_init(peer, model, name, fd, connected);
-
- switch (addr->type) {
- case SOCKET_ADDRESS_TYPE_INET:
- qemu_set_info_str(&s->nc, "connect to %s:%d",
- inet_ntoa(saddr_in.sin_addr),
- ntohs(saddr_in.sin_port));
- break;
- case SOCKET_ADDRESS_TYPE_UNIX:
- qemu_set_info_str(&s->nc, " connect to %s", saddr_un.sun_path);
- break;
- case SOCKET_ADDRESS_TYPE_FD:
- qemu_set_info_str(&s->nc, "connect to fd %d", fd);
- break;
- default:
- g_assert_not_reached();
- }
return 0;
}
diff --git a/qemu-options.hx b/qemu-options.hx
index fafb214cb89f..a91f96a264cc 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -2772,8 +2772,8 @@ DEF("netdev", HAS_ARG, QEMU_OPTION_netdev,
"-netdev socket,id=str[,fd=h][,udp=host:port][,localaddr=host:port]\n"
" configure a network backend to connect to another network\n"
" using an UDP tunnel\n"
- "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port\n"
- "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path\n"
+ "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off]\n"
+ "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off]\n"
"-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor\n"
" configure a network backend to connect to another network\n"
" using a socket connection in stream mode.\n"
--
2.37.3
On 21/10/22 11:09, Laurent Vivier wrote:
> Use QIOChannel, QIOChannelSocket and QIONetListener.
> This allows net/stream to use all the available parameters provided by
> SocketAddress.
>
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> Acked-by: Michael S. Tsirkin <mst@redhat.com>
> ---
> net/stream.c | 492 +++++++++++++++++-------------------------------
> qemu-options.hx | 4 +-
> 2 files changed, 178 insertions(+), 318 deletions(-)
> -static void net_stream_accept(void *opaque)
> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
> {
> NetStreamState *s = opaque;
> - struct sockaddr_storage saddr;
> - socklen_t len;
> - int fd;
> -
> - for (;;) {
> - len = sizeof(saddr);
> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> - if (fd < 0 && errno != EINTR) {
> - return;
> - } else if (fd >= 0) {
> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
> - break;
> - }
> - }
> + QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
> + SocketAddress *addr;
> + int ret;
>
> - s->fd = fd;
> - s->nc.link_down = false;
> - net_stream_connect(s);
> - switch (saddr.ss_family) {
> - case AF_INET: {
> - struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
> -
> - qemu_set_info_str(&s->nc, "connection from %s:%d",
> - inet_ntoa(saddr_in->sin_addr),
> - ntohs(saddr_in->sin_port));
> - break;
> + if (listen_sioc->fd < 0) {
> + qemu_set_info_str(&s->nc, "connection error");
> + return;
> }
> - case AF_UNIX: {
> - struct sockaddr_un saddr_un;
>
> - len = sizeof(saddr_un);
> - getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
> - qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
> - break;
> - }
> - default:
> - g_assert_not_reached();
> + addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
> + g_assert(addr != NULL);
Missing propagating Error* (observed in v12).
> + ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
> + if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
> + qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
> + addr->u.fd.str, -ret);
> + return;
> }
> + g_assert(ret == 0);
> + qapi_free_SocketAddress(addr);
> +
> + s->nc.link_down = true;
> + s->listener = qio_net_listener_new();
> +
> + net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
> + qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
> + qio_net_listener_add(s->listener, listen_sioc);
> }
>
> static int net_stream_server_init(NetClientState *peer,
> @@ -283,105 +286,61 @@ static int net_stream_server_init(NetClientState *peer,
> {
> NetClientState *nc;
> NetStreamState *s;
> - int fd, ret;
> + QIOChannelSocket *listen_sioc = qio_channel_socket_new();
>
> - switch (addr->type) {
> - case SOCKET_ADDRESS_TYPE_INET: {
> - struct sockaddr_in saddr_in;
> -
> - if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
> - errp) < 0) {
> - return -1;
> - }
> -
> - fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
> - if (fd < 0) {
> - error_setg_errno(errp, errno, "can't create stream socket");
> - return -1;
> - }
> - qemu_socket_set_nonblock(fd);
> + nc = qemu_new_net_client(&net_stream_info, peer, model, name);
> + s = DO_UPCAST(NetStreamState, nc, nc);
>
> - socket_set_fast_reuse(fd);
> + s->listen_ioc = QIO_CHANNEL(listen_sioc);
> + qio_channel_socket_listen_async(listen_sioc, addr, 0,
> + net_stream_server_listening, s,
> + NULL, NULL);
>
> - ret = bind(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
> - if (ret < 0) {
> - error_setg_errno(errp, errno, "can't bind ip=%s to socket",
> - inet_ntoa(saddr_in.sin_addr));
> - closesocket(fd);
> - return -1;
> - }
> - break;
> - }
> - case SOCKET_ADDRESS_TYPE_UNIX: {
> - struct sockaddr_un saddr_un;
> -
> - ret = unlink(addr->u.q_unix.path);
> - if (ret < 0 && errno != ENOENT) {
> - error_setg_errno(errp, errno, "failed to unlink socket %s",
> - addr->u.q_unix.path);
> - return -1;
> - }
> + return 0;
> +}
>
> - saddr_un.sun_family = PF_UNIX;
> - ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
> - addr->u.q_unix.path);
> - if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
> - error_setg(errp, "UNIX socket path '%s' is too long",
> - addr->u.q_unix.path);
> - error_append_hint(errp, "Path must be less than %zu bytes\n",
> - sizeof(saddr_un.sun_path));
> - return -1;
> - }
> +static void net_stream_client_connected(QIOTask *task, gpointer opaque)
> +{
> + NetStreamState *s = opaque;
> + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
> + SocketAddress *addr;
> + gchar *uri;
> + int ret;
>
> - fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
> - if (fd < 0) {
> - error_setg_errno(errp, errno, "can't create stream socket");
> - return -1;
> - }
> - qemu_socket_set_nonblock(fd);
> -
> - ret = bind(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
> - if (ret < 0) {
> - error_setg_errno(errp, errno, "can't create socket with path: %s",
> - saddr_un.sun_path);
> - closesocket(fd);
> - return -1;
> - }
> - break;
> - }
> - case SOCKET_ADDRESS_TYPE_FD:
> - fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
> - if (fd == -1) {
> - return -1;
> - }
> - ret = qemu_socket_try_set_nonblock(fd);
> - if (ret < 0) {
> - error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
> - name, fd);
> - return -1;
> - }
> - break;
> - default:
> - error_setg(errp, "only support inet or fd type");
> - return -1;
> + if (sioc->fd < 0) {
> + qemu_set_info_str(&s->nc, "connection error");
> + goto error;
> }
>
> - ret = listen(fd, 0);
> - if (ret < 0) {
> - error_setg_errno(errp, errno, "can't listen on socket");
> - closesocket(fd);
> - return -1;
> + addr = qio_channel_socket_get_remote_address(sioc, NULL);
> + g_assert(addr != NULL);
Ditto (Error*).
> + uri = socket_uri(addr);
> + qemu_set_info_str(&s->nc, uri);
> + g_free(uri);
> +
> + ret = qemu_socket_try_set_nonblock(sioc->fd);
> + if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
> + qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
> + addr->u.fd.str, -ret);
> + qapi_free_SocketAddress(addr);
> + goto error;
> }
> + g_assert(ret == 0);
>
> - nc = qemu_new_net_client(&net_stream_info, peer, model, name);
> - s = DO_UPCAST(NetStreamState, nc, nc);
> - s->fd = -1;
> - s->listen_fd = fd;
> - s->nc.link_down = true;
> net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
>
> - qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
> - return 0;
> + /* Disable Nagle algorithm on TCP sockets to reduce latency */
> + qio_channel_set_delay(s->ioc, false);
> +
> + s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
> + s, NULL);
> + s->nc.link_down = false;
> + qapi_free_SocketAddress(addr);
> +
> + return;
> +error:
> + object_unref(OBJECT(s->ioc));
> + s->ioc = NULL;
> }
Philippe Mathieu-Daudé <philmd@linaro.org> writes:
> On 21/10/22 11:09, Laurent Vivier wrote:
>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>> This allows net/stream to use all the available parameters provided by
>> SocketAddress.
>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>> ---
>> net/stream.c | 492 +++++++++++++++++-------------------------------
>> qemu-options.hx | 4 +-
>> 2 files changed, 178 insertions(+), 318 deletions(-)
>
>> -static void net_stream_accept(void *opaque)
>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>> {
>> NetStreamState *s = opaque;
>> - struct sockaddr_storage saddr;
>> - socklen_t len;
>> - int fd;
>> -
>> - for (;;) {
>> - len = sizeof(saddr);
>> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> - if (fd < 0 && errno != EINTR) {
>> - return;
>> - } else if (fd >= 0) {
>> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>> - break;
>> - }
>> - }
>> + QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>> + SocketAddress *addr;
>> + int ret;
>> - s->fd = fd;
>> - s->nc.link_down = false;
>> - net_stream_connect(s);
>> - switch (saddr.ss_family) {
>> - case AF_INET: {
>> - struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>> -
>> - qemu_set_info_str(&s->nc, "connection from %s:%d",
>> - inet_ntoa(saddr_in->sin_addr),
>> - ntohs(saddr_in->sin_port));
>> - break;
>> + if (listen_sioc->fd < 0) {
>> + qemu_set_info_str(&s->nc, "connection error");
>> + return;
>> }
>> - case AF_UNIX: {
>> - struct sockaddr_un saddr_un;
>> - len = sizeof(saddr_un);
>> - getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>> - qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>> - break;
>> - }
>> - default:
>> - g_assert_not_reached();
>> + addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>> + g_assert(addr != NULL);
>
> Missing propagating Error* (observed in v12).
*If* this is really a programming error: what about &error_abort?
[...]
On 10/21/22 12:35, Markus Armbruster wrote:
> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
>
>> On 21/10/22 11:09, Laurent Vivier wrote:
>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>> This allows net/stream to use all the available parameters provided by
>>> SocketAddress.
>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>> ---
>>> net/stream.c | 492 +++++++++++++++++-------------------------------
>>> qemu-options.hx | 4 +-
>>> 2 files changed, 178 insertions(+), 318 deletions(-)
>>
>>> -static void net_stream_accept(void *opaque)
>>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>>> {
>>> NetStreamState *s = opaque;
>>> - struct sockaddr_storage saddr;
>>> - socklen_t len;
>>> - int fd;
>>> -
>>> - for (;;) {
>>> - len = sizeof(saddr);
>>> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>>> - if (fd < 0 && errno != EINTR) {
>>> - return;
>>> - } else if (fd >= 0) {
>>> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>>> - break;
>>> - }
>>> - }
>>> + QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>>> + SocketAddress *addr;
>>> + int ret;
>>> - s->fd = fd;
>>> - s->nc.link_down = false;
>>> - net_stream_connect(s);
>>> - switch (saddr.ss_family) {
>>> - case AF_INET: {
>>> - struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>>> -
>>> - qemu_set_info_str(&s->nc, "connection from %s:%d",
>>> - inet_ntoa(saddr_in->sin_addr),
>>> - ntohs(saddr_in->sin_port));
>>> - break;
>>> + if (listen_sioc->fd < 0) {
>>> + qemu_set_info_str(&s->nc, "connection error");
>>> + return;
>>> }
>>> - case AF_UNIX: {
>>> - struct sockaddr_un saddr_un;
>>> - len = sizeof(saddr_un);
>>> - getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>>> - qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>>> - break;
>>> - }
>>> - default:
>>> - g_assert_not_reached();
>>> + addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>> + g_assert(addr != NULL);
>>
>> Missing propagating Error* (observed in v12).
>
> *If* this is really a programming error: what about &error_abort?
assert() informs the compiler that following code will not use addr with a NULL value, I
don't think &error_abort does that. This could avoid an error report in code static analyzer.
Thanks,
Laurent
Laurent Vivier <lvivier@redhat.com> writes:
> On 10/21/22 12:35, Markus Armbruster wrote:
>> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
>>
>>> On 21/10/22 11:09, Laurent Vivier wrote:
>>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>>> This allows net/stream to use all the available parameters provided by
>>>> SocketAddress.
>>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>>> ---
>>>> net/stream.c | 492 +++++++++++++++++-------------------------------
>>>> qemu-options.hx | 4 +-
>>>> 2 files changed, 178 insertions(+), 318 deletions(-)
>>>
>>>> -static void net_stream_accept(void *opaque)
>>>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>>>> {
>>>> NetStreamState *s = opaque;
>>>> - struct sockaddr_storage saddr;
>>>> - socklen_t len;
>>>> - int fd;
>>>> -
>>>> - for (;;) {
>>>> - len = sizeof(saddr);
>>>> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>>>> - if (fd < 0 && errno != EINTR) {
>>>> - return;
>>>> - } else if (fd >= 0) {
>>>> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>>>> - break;
>>>> - }
>>>> - }
>>>> + QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>>>> + SocketAddress *addr;
>>>> + int ret;
>>>> - s->fd = fd;
>>>> - s->nc.link_down = false;
>>>> - net_stream_connect(s);
>>>> - switch (saddr.ss_family) {
>>>> - case AF_INET: {
>>>> - struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>>>> -
>>>> - qemu_set_info_str(&s->nc, "connection from %s:%d",
>>>> - inet_ntoa(saddr_in->sin_addr),
>>>> - ntohs(saddr_in->sin_port));
>>>> - break;
>>>> + if (listen_sioc->fd < 0) {
>>>> + qemu_set_info_str(&s->nc, "connection error");
>>>> + return;
>>>> }
>>>> - case AF_UNIX: {
>>>> - struct sockaddr_un saddr_un;
>>>> - len = sizeof(saddr_un);
>>>> - getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>>>> - qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>>>> - break;
>>>> - }
>>>> - default:
>>>> - g_assert_not_reached();
>>>> + addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>>> + g_assert(addr != NULL);
>>>
>>> Missing propagating Error* (observed in v12).
>>
>> *If* this is really a programming error: what about &error_abort?
>
> assert() informs the compiler that following code will not use addr with a NULL value, I
> don't think &error_abort does that. This could avoid an error report in code static analyzer.
I'd expect Coverity to see right through it.
Static analyzers with a less global view won't, of course.
For what it's worth, there are about a thousand uses of &error_abort
outside tests/. I'm not aware of them confusing static analyzers we
care about.
I like &error_abort, because it makes the program crash when we try to
put the error into &error_abort, with an informative message. This is
often right where things go wrong[*]. I personally don't care much
about the better message, but others do. The better stack backtrace has
been quite useful to me.
Let's use &error_abort, and throw in the assert when a static analyzer
we care about needs it.
[*] error_propagate() messes this up. That's why the comments in
error.h ask you to do without when practical.
On 21/10/22 13:31, Markus Armbruster wrote:
> Laurent Vivier <lvivier@redhat.com> writes:
>
>> On 10/21/22 12:35, Markus Armbruster wrote:
>>> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
>>>
>>>> On 21/10/22 11:09, Laurent Vivier wrote:
>>>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>>>> This allows net/stream to use all the available parameters provided by
>>>>> SocketAddress.
>>>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>>>> ---
>>>>> net/stream.c | 492 +++++++++++++++++-------------------------------
>>>>> qemu-options.hx | 4 +-
>>>>> 2 files changed, 178 insertions(+), 318 deletions(-)
>>>>> + addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>>>> + g_assert(addr != NULL);
>>>>
>>>> Missing propagating Error* (observed in v12).
>>>
>>> *If* this is really a programming error: what about &error_abort?
>>
>> assert() informs the compiler that following code will not use addr with a NULL value, I
>> don't think &error_abort does that. This could avoid an error report in code static analyzer.
>
> I'd expect Coverity to see right through it.
>
> Static analyzers with a less global view won't, of course.
>
> For what it's worth, there are about a thousand uses of &error_abort
> outside tests/. I'm not aware of them confusing static analyzers we
> care about.
>
> I like &error_abort, because it makes the program crash when we try to
> put the error into &error_abort, with an informative message. This is
> often right where things go wrong[*]. I personally don't care much
> about the better message, but others do. The better stack backtrace has
> been quite useful to me.
I concur:
qemu-system-x86_64: socket family 0 unsupported
VS:
ERROR:../../net/stream.c:321:net_stream_client_connected: assertion
failed: (addr != NULL)
https://lore.kernel.org/qemu-devel/6fa6b9e5-fede-0f68-752f-0c0d8fa3494f@linaro.org/
>
> Let's use &error_abort, and throw in the assert when a static analyzer
> we care about needs it.
>
>
> [*] error_propagate() messes this up. That's why the comments in
> error.h ask you to do without when practical.
>
>
© 2016 - 2026 Red Hat, Inc.