/*
drbd_receiver.c
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
drbd is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
drbd is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with drbd; see the file COPYING. If not, write to
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <linux/module.h>
#include <asm/uaccess.h>
#include <net/sock.h>
#include <linux/drbd.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/in.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/pkt_sched.h>
#define __KERNEL_SYSCALLS__
#include <linux/unistd.h>
#include <linux/vmalloc.h>
#include <linux/random.h>
#include <linux/string.h>
#include <linux/scatterlist.h>
#include "drbd_int.h"
#include "drbd_req.h"
#include "drbd_vli.h"
enum finish_epoch {
FE_STILL_LIVE,
FE_DESTROYED,
FE_RECYCLED,
};
static int drbd_do_handshake(struct drbd_conf *mdev);
static int drbd_do_auth(struct drbd_conf *mdev);
static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
/*
* some helper functions to deal with single linked page lists,
* page->private being our "next" pointer.
*/
/* If at least n pages are linked at head, get n pages off.
* Otherwise, don't modify head, and return NULL.
* Locking is the responsibility of the caller.
*/
static struct page *page_chain_del(struct page **head, int n)
{
struct page *page;
struct page *tmp;
BUG_ON(!n);
BUG_ON(!head);
page = *head;
if (!page)
return NULL;
while (page) {
tmp = page_chain_next(page);
if (--n == 0)
break; /* found sufficient pages */
if (tmp == NULL)
/* insufficient pages, don't use any of them. */
return NULL;
page = tmp;
}
/* add end of list marker for the returned list */
set_page_private(page, 0);
/* actual return value, and adjustment of head */
page = *head;
*head = tmp;
return page;
}
/* may be used outside of locks to find the tail of a (usually short)
* "private" page chain, before adding it back to a global chain head
* with page_chain_add() under a spinlock. */
static struct page *page_chain_tail(struct page *page, int *len)
{
struct page *tmp;
int i = 1;
while ((tmp = page_chain_next(page)))
++i, page = tmp;
if (len)
*len = i;
return page;
}
static int page_chain_free(struct page *page)
{
struct page *tmp;
int i = 0;
page_chain_for_each_safe(page, tmp) {
put_page(page);
++i;
}
return i;
}
static void page_chain_add(struct page **head,
struct page *chain_first, struct page *chain_last)
{
#if 1
struct page *tmp;
tmp = page_chain_tail(chain_first, NULL);
BUG_ON(tmp != chain_last);
#endif
/* add chain to head */
set_page_private(chain_last, (unsigned long)*head);
*head = chain_first;
}
static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
{
struct page *page = NULL;
struct page *tmp = NULL;
int i = 0;
/* Yes, testing drbd_pp_vacant outside the lock is racy.
* So what. It saves a spin_lock. */
if (drbd_pp_vacant >= number) {
spin_lock(&drbd_pp_lock);
page = page_chain_del(&drbd_pp_pool, number);
if (page)
drbd_pp_vacant -= number;
spin_unlock(&drbd_pp_lock);
if (page)
return page;
}
/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
* "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */
for (i = 0; i < number; i++) {
tmp = alloc_page(GFP_TRY);
if (!tmp)
break;
set_page_private(tmp, (unsigned long)page);
page = tmp;
}
if (i == number)
return page;
/* Not enough pages immediately available this time.
* No need to jump around here, drbd_pp_alloc will retry this
* function "soon". */
if (page) {
tmp = page_chain_tail(page, NULL);
spin_lock(&drbd_pp_lock);
page_chain_add(&drbd_pp_pool, page, tmp);
drbd_pp_vacant += i;
spin_unlock(&drbd_pp_lock);
}
return NULL;
}
static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
{
struct drbd_epoch_entry *e;
struct list_head *le, *tle;
/* The EEs are always appended to the end of the list. Since
they are sent in order over the wire, they have to finish
in order. As soon as we see the first not finished we can
stop to examine the list... */
list_for_each_safe(le, tle, &mdev->net_ee) {
e = list_entry(le, struct drbd_epoch_entry, w.list);
if (drbd_ee_has_active_page(e))
break;
list_move(le, to_be_freed);
}
}
static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
{
LIST_HEAD(reclaimed);
struct drbd_epoch_entry *e, *t;
spin_lock_irq(&mdev->req_lock);
reclaim_net_ee(mdev, &reclaimed);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &reclaimed, w.list)
drbd_free_net_ee(mdev, e);
}
/**
* drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
* @mdev: DRBD device.
* @number: number of pages requested
* @retry: whether to retry, if not enough pages are available right now
*
* Tries to allocate number pages, first from our own page pool, then from
* the kernel, unless this allocation would exceed the max_buffers setting.
* Possibly retry until DRBD frees sufficient pages somewhere else.
*
* Returns a page chain linked via page->private.
*/
static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
{
struct page *page = NULL;
DEFINE_WAIT(wait);
/* Yes, we may run up to @number over max_buffers. If we
* follow it strictly, the admin will get it wrong anyways. */
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
while (page == NULL) {
prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
drbd_kick_lo_and_reclaim_net(mdev);
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
page = drbd_pp_first_pages_or_try_alloc(mdev, number);
if (page)
break;
}
if (!retry)
break;
if (signal_pending(current)) {
dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
break;
}
schedule();
}
finish_wait(&drbd_pp_wait, &wait);
if (page)
atomic_add(number, &mdev->pp_in_use);
return page;
}
/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
* Is also used from inside an other spin_lock_irq(&mdev->req_lock);
* Either links the page chain back to the global pool,
* or returns all pages to the system. */
static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
{
atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
int i;
if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
i = page_chain_free(page);
else {
struct page *tmp;
tmp = page_chain_tail(page, &i);
spin_lock(&drbd_pp_lock);
page_chain_add(&drbd_pp_pool, page, tmp);
drbd_pp_vacant += i;
spin_unlock(&drbd_pp_lock);
}
i = atomic_sub_return(i, a);
if (i < 0)
dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
is_net ? "pp_in_use_by_net" : "pp_in_use", i);
wake_up(&drbd_pp_wait);
}
/*
You need to hold the req_lock:
_drbd_wait_ee_list_empty()
You must not have the req_lock:
drbd_free_ee()
drbd_alloc_ee()
drbd_init_ee()
drbd_release_ee()
drbd_ee_fix_bhs()
drbd_process_done_ee()
drbd_clear_done_ee()
drbd_wait_ee_list_empty()
*/
struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
u64 id,
sector_t sector,
unsigned int data_size,
gfp_t gfp_mask) __must_hold(local)
{
struct drbd_epoch_entry *e;
struct page *page;
unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
return NULL;
e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
if (!e) {
if (!(gfp_mask & __GFP_NOWARN))
dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
return NULL;
}
page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
if (!page)
goto fail;
INIT_HLIST_NODE(&e->colision);
e->epoch = NULL;
e->mdev = mdev;
e->pages = page;
atomic_set(&e->pending_bios, 0);
e->size = data_size;
e->flags = 0;
e->sector = sector;
e->block_id = id;
return e;
fail:
mempool_free(e, drbd_ee_mempool);
return NULL;
}
void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
{
if (e->flags & EE_HAS_DIGEST)
kfree(e->digest);
drbd_pp_free(mdev, e->pages, is_net);
D_ASSERT(atomic_read(&e->pending_bios) == 0);
D_ASSERT(hlist_unhashed(&e->colision));
mempool_free(e, drbd_ee_mempool);
}
int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
{
LIST_HEAD(work_list);
struct drbd_epoch_entry *e, *t;
int count = 0;
int is_net = list == &mdev->net_ee;
spin_lock_irq(&mdev->req_lock);
list_splice_init(list, &work_list);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &work_list, w.list) {
drbd_free_some_ee(mdev, e, is_net);
count++;
}
return count;
}
/*
* This function is called from _asender only_
* but see also comments in _req_mod(,barrier_acked)
* and receive_Barrier.
*
* Move entries from net_ee to done_ee, if ready.
* Grab done_ee, call all callbacks, free the entries.
* The callbacks typically send out ACKs.
*/
static int drbd_process_done_ee(struct drbd_conf *mdev)
{
LIST_HEAD(work_list);
LIST_HEAD(reclaimed);
struct drbd_epoch_entry *e, *t;
int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
spin_lock_irq(&mdev->req_lock);
reclaim_net_ee(mdev, &reclaimed);
list_splice_init(&mdev->done_ee, &work_list);
spin_unlock_irq(&mdev->req_lock);
list_for_each_entry_safe(e, t, &reclaimed, w.list)
drbd_free_net_ee(mdev, e);
/* possible callbacks here:
* e_end_block, and e_end_resync_block, e_send_discard_ack.
* all ignore the last argument.
*/
list_for_each_entry_safe(e, t, &work_list, w.list) {
/* list_del not necessary, next/prev members not touched */
ok = e->w.cb(mdev, &e->w, !ok) && ok;
drbd_free_ee(mdev, e);
}
wake_up(&mdev->ee_wait);
return ok;
}
void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
{
DEFINE_WAIT(wait);
/* avoids spin_lock/unlock
* and calling prepare_to_wait in the fast path */
while (!list_empty(head)) {
prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
spin_unlock_irq(&mdev->req_lock);
io_schedule();
finish_wait(&mdev->ee_wait, &wait);
spin_lock_irq(&mdev->req_lock);
}
}
void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
{
spin_lock_irq(&mdev->req_lock);
_drbd_wait_ee_list_empty(mdev, head);
spin_unlock_irq(&mdev->req_lock);
}
/* see also kernel_accept; which is only present since 2.6.18.
* also we want to log which part of it failed, exactly */
static int drbd_accept(struct drbd_conf *mdev, const char **what,
struct socket *sock, struct socket **newsock)
{
struct sock *sk = sock->sk;
int err = 0;
*what = "listen";
err = sock->ops->listen(sock, 5);
if (err < 0)
goto out;
*what = "sock_create_lite";
err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
newsock);
if (err < 0)
goto out;
*what = "accept";
err = sock->ops->accept(sock, *newsock, 0);
if (err < 0) {
sock_release(*newsock);
*newsock = NULL;
goto out;
}
(*newsock)->ops = sock->ops;
out:
return err;
}
static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
void *buf, size_t size, int flags)
{
mm_segment_t oldfs;
struct kvec iov = {
.iov_base = buf,
.iov_len = size,
};
struct msghdr msg = {
.msg_iovlen = 1,
.msg_iov = (struct iovec *)&iov,
.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
};
int rv;
oldfs = get_fs();
set_fs(KERNEL_DS);
rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
set_fs(oldfs);
return rv;
}
static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
{
mm_segment_t oldfs;
struct kvec iov = {
.iov_base = buf,
.iov_len = size,
};
struct msghdr msg = {
.msg_iovlen = 1,
.msg_iov = (struct iovec *)&iov,
.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
};
int rv;
oldfs = get_fs();
set_fs(KERNEL_DS);
for (;;) {
rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
if (rv == size)
break;
/* Note:
* ECONNRESET other side closed the connection
* ERESTARTSYS (on sock) we got a signal
*/
if (rv < 0) {
if (rv == -ECONNRESET)
dev_info(DEV, "sock was reset by peer\n");
else if (rv != -ERESTARTSYS)
dev_err(DEV, "sock_recvmsg returned %d\n", rv);
break;
} else if (rv == 0) {
dev_info(DEV, "sock was shut down by peer\n");
break;
} else {
/* signal came in, or peer/link went down,
* after we read a partial message
*/
/* D_ASSERT(signal_pending(current)); */
break;
}
};
set_fs(oldfs);
if (rv != size)
drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
return rv;
}
/* quoting tcp(7):
* On individual connections, the socket buffer size must be set prior to the
* listen(2) or connect(2) calls in order to have it take effect.
* This is our wrapper to do so.
*/
static void drbd_setbufsize(struct socket *sock, unsigned int snd,
unsigned int rcv)
{
/* open coded SO_SNDBUF, SO_RCVBUF */
if (snd) {
sock->sk->sk_sndbuf = snd;
sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
}
if (rcv) {
sock->sk->sk_rcvbuf = rcv;
sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
}
}
static struct socket *drbd_try_connect(struct drbd_conf *mdev)
{
const char *what;
struct socket *sock;
struct sockaddr_in6 src_in6;
int err;
int disconnect_on_error = 1;
if (!get_net_conf(mdev))
return NULL;
what = "sock_create_kern";
err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
SOCK_STREAM, IPPROTO_TCP, &sock);
if (err < 0) {
sock = NULL;
goto out;
}
sock->sk->sk_rcvtimeo =
sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
mdev->net_conf->rcvbuf_size);
/* explicitly bind to the configured IP as source IP
* for the outgoing connections.
* This is needed for multihomed hosts and to be
* able to use lo: interfaces for drbd.
* Make sure to use 0 as port number, so linux selects
* a free one dynamically.
*/
memcpy(&src_in6, mdev->net_conf->my_addr,
min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
src_in6.sin6_port = 0;
else
((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
what = "bind before connect";
err = sock->ops->bind(sock,
(struct sockaddr *) &src_in6,
mdev->net_conf->my_addr_len);
if (err < 0)
goto out;
/* connect may fail, peer not yet available.
* stay C_WF_CONNECTION, don't go Disconnecting! */
disconnect_on_error = 0;
what = "connect";
err = sock->ops->connect(sock,
(struct sockaddr *)mdev->net_conf->peer_addr,
mdev->net_conf->peer_addr_len, 0);
out:
if (err < 0) {
if (sock) {
sock_release(sock);
sock = NULL;
}
switch (-err) {
/* timeout, busy, signal pending */
case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
case EINTR: case ERESTARTSYS:
/* peer not (yet) available, network problem */
case ECONNREFUSED: case ENETUNREACH:
case EHOSTDOWN: case EHOSTUNREACH:
disconnect_on_error = 0;
break;
default:
dev_err(DEV, "%s failed, err = %d\n", what, err);
}
if (disconnect_on_error)
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
}
put_net_conf(mdev);
return sock;
}
static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
{
int timeo, err;
struct socket *s_estab = NULL, *s_listen;
const char *what;
if (!get_net_conf(mdev))
return NULL;
what = "sock_create_kern";
err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
SOCK_STREAM, IPPROTO_TCP, &s_listen);
if (err) {
s_listen = NULL;
goto out;
}
timeo = mdev->net_conf->try_connect_int * HZ;
timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
s_listen->sk->sk_rcvtimeo = timeo;
s_listen->sk->sk_sndtimeo = timeo;
drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
mdev->net_conf->rcvbuf_size);
what = "bind before listen";
err = s_listen->ops->bind(s_listen,
(struct sockaddr *) mdev->net_conf->my_addr,
mdev->net_conf->my_addr_len);
if (err < 0)
goto out;
err = drbd_accept(mdev, &what, s_listen, &s_estab);
out:
if (s_listen)
sock_release(s_listen);
if (err < 0) {
if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
dev_err(DEV, "%s failed, err = %d\n", what, err);
drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
}
}
put_net_conf(mdev);
return s_estab;
}
static int drbd_send_fp(struct drbd_conf *mdev,
struct socket *sock, enum drbd_packets cmd)
{
struct p_header80 *h = &mdev->data.sbuf.header.h80;
return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
}
static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
{
struct p_header80 *h = &mdev->data.rbuf.header.h80;
int rr;
rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
return be16_to_cpu(h->command);
return 0xffff;
}
/**
* drbd_socket_okay() - Free the socket if its connection is not okay
* @mdev: DRBD device.
* @sock: pointer to the pointer to the socket.
*/
static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
{
int rr;
char tb[4];
if (!*sock)
return false;
rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
if (rr > 0 || rr == -EAGAIN) {
return true;
} else {
sock_release(*sock);
*sock = NULL;
return false;
}
}
/*
* return values:
* 1 yes, we have a valid connection
* 0 oops, did not work out, please try again
* -1 peer talks different language,
* no point in trying again, please go standalone.
* -2 We do not have a network config...
*/
static int drbd_connect(struct drbd_conf *mdev)
{
struct socket *s, *sock, *msock;
int try, h, ok;
D_ASSERT(!mdev->data.socket);
if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
return -2;
clear_bit(DISCARD_CONCURRENT, &mdev->flags);
sock = NULL;
msock = NULL;
do {
for (try = 0;;) {
/* 3 tries, this should take less than a second! */
s = drbd_try_connect(mdev);
if (s || ++try >= 3)
break;
/* give the other side time to call bind() & listen() */
schedule_timeout_interruptible(HZ / 10);
}
if (s) {
if (!sock) {
drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
sock = s;
s = NULL;
} else if (!msock) {
drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
msock = s;
s = NULL;
} else {
dev_err(DEV, "Logic error in drbd_connect()\n");
goto out_release_sockets;
}
}
if (sock && msock) {
schedule_timeout_interruptible(HZ / 10);
ok = drbd_socket_okay(mdev, &sock);
ok = drbd_socket_okay(mdev, &msock) && ok;
if (ok)
break;
}
retry:
s = drbd_wait_for_connect(mdev);
if (s) {
try = drbd_recv_fp(mdev, s);
drbd_socket_okay(mdev, &sock);
drbd_socket_okay(mdev, &msock);
switch (try) {
case P_HAND_SHAKE_S:
if (sock) {
dev_warn(DEV, "initial packet S crossed\n");
sock_release(sock);
}
sock = s;
break;
case P_HAND_SHAKE_M:
if (msock) {
dev_warn(DEV, "initial packet M crossed\n");
sock_release(msock);
}
msock = s;
set_bit(DISCARD_CONCURRENT, &mdev->flags);
break;
default:
dev_warn(DEV, "Error receiving initial packet\n");
sock_release(s);
if (random32() & 1)
goto retry;
}
}
if (mdev->state.conn <= C_DISCONNECTING)
goto out_release_sockets;
if (signal_pending(current)) {
flush_signals(current);
smp_rmb();
if (get_t_state(&mdev->receiver) == Exiting)
goto out_release_sockets;
}
if (sock && msock) {
ok = drbd_socket_okay(mdev, &sock);
ok = drbd_socket_okay(mdev, &msock) && ok;
if (ok)
break;
}
} while (1);
msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
sock->sk->sk_allocation = GFP_NOIO;
msock->sk->sk_allocation = GFP_NOIO;
sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
/* NOT YET ...
* sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
* sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
* first set it to the P_HAND_SHAKE timeout,
* which we set to 4x the configured ping_timeout. */
sock->sk->sk_sndtimeo =
sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
/* we don't want delays.
* we use TCP_CORK where appropriate, though */
drbd_tcp_nodelay(sock);
drbd_tcp_nodelay(msock);
mdev->data.socket = sock;
mdev->meta.socket = msock;
mdev->last_received = jiffies;
D_ASSERT(mdev->asender.task == NULL);
h = drbd_do_handshake(mdev);
if (h <= 0)
return h;
if (mdev->cram_hmac_tfm) {
/* drbd_request_state(mdev, NS(conn, WFAuth)); */
switch (drbd_do_auth(mdev)) {
case -1:
dev_err(DEV, "Authentication of peer failed\n");
return -1;
case 0:
dev_err(DEV, "Authentication of peer failed, trying again.\n");
return 0;
}
}
if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
return 0;
sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
atomic_set(&mdev->packet_seq, 0);
mdev->peer_seq = 0;
drbd_thread_start(&mdev->asender);
if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
put_ldev(mdev);
}
|