Skip to content

Commit 84111f5

Browse files
Zijian ZhangKernel Patches Daemon
Zijian Zhang
authored and
Kernel Patches Daemon
committed
tcp_bpf: improve ingress redirection performance with message corking
The TCP_BPF ingress redirection path currently lacks the message corking mechanism found in standard TCP. This causes the sender to wake up the receiver for every message, even when messages are small, resulting in reduced throughput compared to regular TCP in certain scenarios. This change introduces a kernel worker-based intermediate layer to provide automatic message corking for TCP_BPF. While this adds a slight latency overhead, it significantly improves overall throughput by reducing unnecessary wake-ups and reducing the sock lock contention. Reviewed-by: Amery Hung <[email protected]> Co-developed-by: Cong Wang <[email protected]> Signed-off-by: Cong Wang <[email protected]> Signed-off-by: Zijian Zhang <[email protected]>
1 parent 9e60af8 commit 84111f5

File tree

3 files changed

+347
-8
lines changed

3 files changed

+347
-8
lines changed

include/linux/skmsg.h

+19
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
#define MAX_MSG_FRAGS MAX_SKB_FRAGS
1717
#define NR_MSG_FRAG_IDS (MAX_MSG_FRAGS + 1)
18+
/* GSO size for TCP BPF backlog processing */
19+
#define TCP_BPF_GSO_SIZE 65536
1820

1921
enum __sk_action {
2022
__SK_DROP = 0,
@@ -85,8 +87,10 @@ struct sk_psock {
8587
struct sock *sk_redir;
8688
u32 apply_bytes;
8789
u32 cork_bytes;
90+
u32 backlog_since_notify;
8891
u8 eval;
8992
u8 redir_ingress : 1; /* undefined if sk_redir is null */
93+
u8 backlog_work_delayed : 1;
9094
struct sk_msg *cork;
9195
struct sk_psock_progs progs;
9296
#if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
@@ -97,6 +101,9 @@ struct sk_psock {
97101
struct sk_buff_head ingress_skb;
98102
struct list_head ingress_msg;
99103
spinlock_t ingress_lock;
104+
struct list_head backlog_msg;
105+
/* spin_lock for backlog_msg and backlog_since_notify */
106+
spinlock_t backlog_msg_lock;
100107
unsigned long state;
101108
struct list_head link;
102109
spinlock_t link_lock;
@@ -117,11 +124,13 @@ struct sk_psock {
117124
struct mutex work_mutex;
118125
struct sk_psock_work_state work_state;
119126
struct delayed_work work;
127+
struct delayed_work backlog_work;
120128
struct sock *sk_pair;
121129
struct rcu_work rwork;
122130
};
123131

124132
struct sk_msg *sk_msg_alloc(gfp_t gfp);
133+
bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce);
125134
int sk_msg_expand(struct sock *sk, struct sk_msg *msg, int len,
126135
int elem_first_coalesce);
127136
int sk_msg_clone(struct sock *sk, struct sk_msg *dst, struct sk_msg *src,
@@ -396,9 +405,19 @@ static inline void sk_psock_report_error(struct sk_psock *psock, int err)
396405
sk_error_report(sk);
397406
}
398407

408+
void sk_psock_backlog_msg(struct sk_psock *psock);
399409
struct sk_psock *sk_psock_init(struct sock *sk, int node);
400410
void sk_psock_stop(struct sk_psock *psock);
401411

412+
static inline void sk_psock_run_backlog_work(struct sk_psock *psock,
413+
bool delayed)
414+
{
415+
if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
416+
return;
417+
psock->backlog_work_delayed = delayed;
418+
schedule_delayed_work(&psock->backlog_work, delayed ? 1 : 0);
419+
}
420+
402421
#if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
403422
int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock);
404423
void sk_psock_start_strp(struct sock *sk, struct sk_psock *psock);

net/core/skmsg.c

+138-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
struct kmem_cache *sk_msg_cachep;
1414

15-
static bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
15+
bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
1616
{
1717
if (msg->sg.end > msg->sg.start &&
1818
elem_first_coalesce < msg->sg.end)
@@ -707,6 +707,118 @@ static void sk_psock_backlog(struct work_struct *work)
707707
mutex_unlock(&psock->work_mutex);
708708
}
709709

710+
static bool backlog_notify(struct sk_psock *psock, bool m_sched_failed,
711+
bool ingress_empty)
712+
{
713+
/* Notify if:
714+
* 1. We have corked enough bytes
715+
* 2. We have already delayed notification
716+
* 3. Memory allocation failed
717+
* 4. Ingress queue was empty and we're about to add data
718+
*/
719+
return psock->backlog_since_notify >= TCP_BPF_GSO_SIZE ||
720+
psock->backlog_work_delayed ||
721+
m_sched_failed ||
722+
ingress_empty;
723+
}
724+
725+
static bool backlog_xfer_to_local(struct sk_psock *psock, struct sock *sk_from,
726+
struct list_head *local_head, u32 *tot_size)
727+
{
728+
struct sock *sk = psock->sk;
729+
struct sk_msg *msg, *tmp;
730+
u32 size = 0;
731+
732+
list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
733+
if (msg->sk != sk_from)
734+
break;
735+
736+
if (!__sk_rmem_schedule(sk, msg->sg.size, false))
737+
return true;
738+
739+
list_move_tail(&msg->list, local_head);
740+
sk_wmem_queued_add(msg->sk, -msg->sg.size);
741+
sock_put(msg->sk);
742+
msg->sk = NULL;
743+
psock->backlog_since_notify += msg->sg.size;
744+
size += msg->sg.size;
745+
}
746+
747+
*tot_size = size;
748+
return false;
749+
}
750+
751+
/* This function handles the transfer of backlogged messages from the sender
752+
* backlog queue to the ingress queue of the peer socket. Notification of data
753+
* availability will be sent under some conditions.
754+
*/
755+
void sk_psock_backlog_msg(struct sk_psock *psock)
756+
{
757+
bool rmem_schedule_failed = false;
758+
struct sock *sk_from = NULL;
759+
struct sock *sk = psock->sk;
760+
LIST_HEAD(local_head);
761+
struct sk_msg *msg;
762+
bool should_notify;
763+
u32 tot_size = 0;
764+
765+
if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
766+
return;
767+
768+
lock_sock(sk);
769+
spin_lock(&psock->backlog_msg_lock);
770+
771+
msg = list_first_entry_or_null(&psock->backlog_msg,
772+
struct sk_msg, list);
773+
if (!msg) {
774+
should_notify = !list_empty(&psock->ingress_msg);
775+
spin_unlock(&psock->backlog_msg_lock);
776+
goto notify;
777+
}
778+
779+
sk_from = msg->sk;
780+
sock_hold(sk_from);
781+
782+
rmem_schedule_failed = backlog_xfer_to_local(psock, sk_from,
783+
&local_head, &tot_size);
784+
should_notify = backlog_notify(psock, rmem_schedule_failed,
785+
list_empty(&psock->ingress_msg));
786+
spin_unlock(&psock->backlog_msg_lock);
787+
788+
spin_lock_bh(&psock->ingress_lock);
789+
list_splice_tail_init(&local_head, &psock->ingress_msg);
790+
spin_unlock_bh(&psock->ingress_lock);
791+
792+
atomic_add(tot_size, &sk->sk_rmem_alloc);
793+
sk_mem_charge(sk, tot_size);
794+
795+
notify:
796+
if (should_notify) {
797+
psock->backlog_since_notify = 0;
798+
sk_psock_data_ready(sk, psock);
799+
if (!list_empty(&psock->backlog_msg))
800+
sk_psock_run_backlog_work(psock, rmem_schedule_failed);
801+
} else {
802+
sk_psock_run_backlog_work(psock, true);
803+
}
804+
release_sock(sk);
805+
806+
if (sk_from) {
807+
bool slow = lock_sock_fast(sk_from);
808+
809+
sk_mem_uncharge(sk_from, tot_size);
810+
unlock_sock_fast(sk_from, slow);
811+
sock_put(sk_from);
812+
}
813+
}
814+
815+
static void sk_psock_backlog_msg_work(struct work_struct *work)
816+
{
817+
struct delayed_work *dwork = to_delayed_work(work);
818+
819+
sk_psock_backlog_msg(container_of(dwork, struct sk_psock, backlog_work));
820+
}
821+
710822
struct sk_psock *sk_psock_init(struct sock *sk, int node)
711823
{
712824
struct sk_psock *psock;
@@ -744,8 +856,11 @@ struct sk_psock *sk_psock_init(struct sock *sk, int node)
744856

745857
INIT_DELAYED_WORK(&psock->work, sk_psock_backlog);
746858
mutex_init(&psock->work_mutex);
859+
INIT_DELAYED_WORK(&psock->backlog_work, sk_psock_backlog_msg_work);
747860
INIT_LIST_HEAD(&psock->ingress_msg);
748861
spin_lock_init(&psock->ingress_lock);
862+
INIT_LIST_HEAD(&psock->backlog_msg);
863+
spin_lock_init(&psock->backlog_msg_lock);
749864
skb_queue_head_init(&psock->ingress_skb);
750865

751866
sk_psock_set_state(psock, SK_PSOCK_TX_ENABLED);
@@ -799,6 +914,26 @@ static void __sk_psock_zap_ingress(struct sk_psock *psock)
799914
__sk_psock_purge_ingress_msg(psock);
800915
}
801916

917+
static void __sk_psock_purge_backlog_msg(struct sk_psock *psock)
918+
{
919+
struct sk_msg *msg, *tmp;
920+
921+
spin_lock(&psock->backlog_msg_lock);
922+
list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
923+
struct sock *sk_from = msg->sk;
924+
bool slow;
925+
926+
list_del(&msg->list);
927+
slow = lock_sock_fast(sk_from);
928+
sk_wmem_queued_add(sk_from, -msg->sg.size);
929+
sock_put(sk_from);
930+
sk_msg_free(sk_from, msg);
931+
unlock_sock_fast(sk_from, slow);
932+
kfree_sk_msg(msg);
933+
}
934+
spin_unlock(&psock->backlog_msg_lock);
935+
}
936+
802937
static void sk_psock_link_destroy(struct sk_psock *psock)
803938
{
804939
struct sk_psock_link *link, *tmp;
@@ -828,7 +963,9 @@ static void sk_psock_destroy(struct work_struct *work)
828963
sk_psock_done_strp(psock);
829964

830965
cancel_delayed_work_sync(&psock->work);
966+
cancel_delayed_work_sync(&psock->backlog_work);
831967
__sk_psock_zap_ingress(psock);
968+
__sk_psock_purge_backlog_msg(psock);
832969
mutex_destroy(&psock->work_mutex);
833970

834971
psock_progs_drop(&psock->progs);

0 commit comments

Comments
 (0)