aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-12-23 15:12:31 -0500
committerSage Weil <sage@newdream.net>2009-12-23 15:12:31 -0500
commit58bb3b374b07a2a43315213f00a48a5ffd6d0915 (patch)
tree04599b1f6c5f8bf501a1070b5ab7269a9a97fece
parent04a419f908b5291ff7e8ffd7aa351fa0ac0c08af (diff)
ceph: support ceph_pagelist for message payload
The ceph_pagelist is a simple list of whole pages, strung together via their lru list_head. It facilitates encoding to a "buffer" of unknown size. Allow its use in place of the ceph_msg page vector. This will be used to fix the huge buffer preallocation woes of MDS reconnection. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/messenger.c24
-rw-r--r--fs/ceph/messenger.h1
-rw-r--r--fs/ceph/pagelist.c54
-rw-r--r--fs/ceph/pagelist.h54
5 files changed, 130 insertions, 5 deletions
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 827629c85768..47caf2f1b75a 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
8 8
9ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \ 9ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
10 export.o caps.o snap.o xattr.o \ 10 export.o caps.o snap.o xattr.o \
11 messenger.o msgpool.o buffer.o \ 11 messenger.o msgpool.o buffer.o pagelist.o \
12 mds_client.o mdsmap.o \ 12 mds_client.o mdsmap.o \
13 mon_client.o \ 13 mon_client.o \
14 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ 14 osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 68052f664280..c1106e8360f0 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -13,6 +13,7 @@
13#include "super.h" 13#include "super.h"
14#include "messenger.h" 14#include "messenger.h"
15#include "decode.h" 15#include "decode.h"
16#include "pagelist.h"
16 17
17/* 18/*
18 * Ceph uses the messenger to exchange ceph_msg messages with other 19 * Ceph uses the messenger to exchange ceph_msg messages with other
@@ -728,6 +729,11 @@ static int write_partial_msg_pages(struct ceph_connection *con)
728 page = msg->pages[con->out_msg_pos.page]; 729 page = msg->pages[con->out_msg_pos.page];
729 if (crc) 730 if (crc)
730 kaddr = kmap(page); 731 kaddr = kmap(page);
732 } else if (msg->pagelist) {
733 page = list_first_entry(&msg->pagelist->head,
734 struct page, lru);
735 if (crc)
736 kaddr = kmap(page);
731 } else { 737 } else {
732 page = con->msgr->zero_page; 738 page = con->msgr->zero_page;
733 if (crc) 739 if (crc)
@@ -750,7 +756,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
750 MSG_DONTWAIT | MSG_NOSIGNAL | 756 MSG_DONTWAIT | MSG_NOSIGNAL |
751 MSG_MORE); 757 MSG_MORE);
752 758
753 if (crc && msg->pages) 759 if (crc && (msg->pages || msg->pagelist))
754 kunmap(page); 760 kunmap(page);
755 761
756 if (ret <= 0) 762 if (ret <= 0)
@@ -762,6 +768,9 @@ static int write_partial_msg_pages(struct ceph_connection *con)
762 con->out_msg_pos.page_pos = 0; 768 con->out_msg_pos.page_pos = 0;
763 con->out_msg_pos.page++; 769 con->out_msg_pos.page++;
764 con->out_msg_pos.did_page_crc = 0; 770 con->out_msg_pos.did_page_crc = 0;
771 if (msg->pagelist)
772 list_move_tail(&page->lru,
773 &msg->pagelist->head);
765 } 774 }
766 } 775 }
767 776
@@ -1051,13 +1060,13 @@ static int process_banner(struct ceph_connection *con)
1051 &con->actual_peer_addr) && 1060 &con->actual_peer_addr) &&
1052 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1061 !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1053 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1062 con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1054 pr_err("wrong peer, want %s/%d, " 1063 pr_warning("wrong peer, want %s/%d, "
1055 "got %s/%d, wtf\n", 1064 "got %s/%d\n",
1056 pr_addr(&con->peer_addr.in_addr), 1065 pr_addr(&con->peer_addr.in_addr),
1057 con->peer_addr.nonce, 1066 con->peer_addr.nonce,
1058 pr_addr(&con->actual_peer_addr.in_addr), 1067 pr_addr(&con->actual_peer_addr.in_addr),
1059 con->actual_peer_addr.nonce); 1068 con->actual_peer_addr.nonce);
1060 con->error_msg = "protocol error, wrong peer"; 1069 con->error_msg = "wrong peer at address";
1061 return -1; 1070 return -1;
1062 } 1071 }
1063 1072
@@ -2096,6 +2105,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
2096 /* data */ 2105 /* data */
2097 m->nr_pages = calc_pages_for(page_off, page_len); 2106 m->nr_pages = calc_pages_for(page_off, page_len);
2098 m->pages = pages; 2107 m->pages = pages;
2108 m->pagelist = NULL;
2099 2109
2100 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, 2110 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
2101 m->nr_pages); 2111 m->nr_pages);
@@ -2181,6 +2191,12 @@ void ceph_msg_last_put(struct kref *kref)
2181 m->nr_pages = 0; 2191 m->nr_pages = 0;
2182 m->pages = NULL; 2192 m->pages = NULL;
2183 2193
2194 if (m->pagelist) {
2195 ceph_pagelist_release(m->pagelist);
2196 kfree(m->pagelist);
2197 m->pagelist = NULL;
2198 }
2199
2184 if (m->pool) 2200 if (m->pool)
2185 ceph_msgpool_put(m->pool, m); 2201 ceph_msgpool_put(m->pool, m);
2186 else 2202 else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 7e2aab1d3ce2..a7b684145092 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -85,6 +85,7 @@ struct ceph_msg {
85 struct ceph_buffer *middle; 85 struct ceph_buffer *middle;
86 struct page **pages; /* data payload. NOT OWNER. */ 86 struct page **pages; /* data payload. NOT OWNER. */
87 unsigned nr_pages; /* size of page array */ 87 unsigned nr_pages; /* size of page array */
88 struct ceph_pagelist *pagelist; /* instead of pages */
88 struct list_head list_head; 89 struct list_head list_head;
89 struct kref kref; 90 struct kref kref;
90 bool front_is_vmalloc; 91 bool front_is_vmalloc;
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
new file mode 100644
index 000000000000..370e93695474
--- /dev/null
+++ b/fs/ceph/pagelist.c
@@ -0,0 +1,54 @@
1
2#include <linux/pagemap.h>
3#include <linux/highmem.h>
4
5#include "pagelist.h"
6
7int ceph_pagelist_release(struct ceph_pagelist *pl)
8{
9 if (pl->mapped_tail)
10 kunmap(pl->mapped_tail);
11 while (!list_empty(&pl->head)) {
12 struct page *page = list_first_entry(&pl->head, struct page,
13 lru);
14 list_del(&page->lru);
15 __free_page(page);
16 }
17 return 0;
18}
19
20static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
21{
22 struct page *page = alloc_page(GFP_NOFS);
23 if (!page)
24 return -ENOMEM;
25 pl->room += PAGE_SIZE;
26 list_add_tail(&page->lru, &pl->head);
27 if (pl->mapped_tail)
28 kunmap(pl->mapped_tail);
29 pl->mapped_tail = kmap(page);
30 return 0;
31}
32
33int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
34{
35 while (pl->room < len) {
36 size_t bit = pl->room;
37 int ret;
38
39 memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK),
40 buf, bit);
41 pl->length += bit;
42 pl->room -= bit;
43 buf += bit;
44 len -= bit;
45 ret = ceph_pagelist_addpage(pl);
46 if (ret)
47 return ret;
48 }
49
50 memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK), buf, len);
51 pl->length += len;
52 pl->room -= len;
53 return 0;
54}
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
new file mode 100644
index 000000000000..e8a4187e1087
--- /dev/null
+++ b/fs/ceph/pagelist.h
@@ -0,0 +1,54 @@
1#ifndef __FS_CEPH_PAGELIST_H
2#define __FS_CEPH_PAGELIST_H
3
4#include <linux/list.h>
5
6struct ceph_pagelist {
7 struct list_head head;
8 void *mapped_tail;
9 size_t length;
10 size_t room;
11};
12
13static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
14{
15 INIT_LIST_HEAD(&pl->head);
16 pl->mapped_tail = NULL;
17 pl->length = 0;
18 pl->room = 0;
19}
20extern int ceph_pagelist_release(struct ceph_pagelist *pl);
21
22extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);
23
24static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
25{
26 __le64 ev = cpu_to_le64(v);
27 return ceph_pagelist_append(pl, &ev, sizeof(ev));
28}
29static inline int ceph_pagelist_encode_32(struct ceph_pagelist *pl, u32 v)
30{
31 __le32 ev = cpu_to_le32(v);
32 return ceph_pagelist_append(pl, &ev, sizeof(ev));
33}
34static inline int ceph_pagelist_encode_16(struct ceph_pagelist *pl, u16 v)
35{
36 __le16 ev = cpu_to_le16(v);
37 return ceph_pagelist_append(pl, &ev, sizeof(ev));
38}
39static inline int ceph_pagelist_encode_8(struct ceph_pagelist *pl, u8 v)
40{
41 return ceph_pagelist_append(pl, &v, 1);
42}
43static inline int ceph_pagelist_encode_string(struct ceph_pagelist *pl,
44 char *s, size_t len)
45{
46 int ret = ceph_pagelist_encode_32(pl, len);
47 if (ret)
48 return ret;
49 if (len)
50 return ceph_pagelist_append(pl, s, len);
51 return 0;
52}
53
54#endif