From a19ed09ab0c43535fed852585de5765456875a0a Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 14 Sep 2023 15:24:49 +0800 Subject: [PATCH 1/5] chore: add server close test --- connection_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/connection_test.go b/connection_test.go index ec72a0c9..3ecaba30 100644 --- a/connection_test.go +++ b/connection_test.go @@ -540,3 +540,86 @@ func TestParallelShortConnection(t *testing.T) { time.Sleep(time.Millisecond * 100) } } + +func TestConnectionServerClose(t *testing.T) { + ln, err := createTestListener("tcp", ":12345") + MustNil(t, err) + defer ln.Close() + + /* + Client Server + - Client --- connect --> Server + - Client <-- [ping] --- Server + - Client --- [pong] --> Server + - Client <-- close --- Server + - Client --- close --> Server + */ + + var wg sync.WaitGroup + el, err := NewEventLoop( + func(ctx context.Context, connection Connection) error { + defer wg.Done() + buf, err := connection.Reader().Next(4) // pong + Equal(t, string(buf), "pong") + MustNil(t, err) + return connection.Close() + }, + WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + defer wg.Done() + // check OnPrepare + v := ctx.Value("prepare").(string) + Equal(t, v, "true") + + send := []byte("ping") + _, err = connection.Writer().WriteBinary(send) + MustNil(t, err) + err = connection.Writer().Flush() + MustNil(t, err) + connection.AddCloseCallback(func(connection Connection) error { + wg.Done() + return nil + }) + return ctx + }), + WithOnPrepare(func(connection Connection) context.Context { + defer wg.Done() + return context.WithValue(context.Background(), "prepare", "true") + }), + ) + defer el.Shutdown(context.Background()) + go func() { + err := el.Serve(ln) + if err != nil { + t.Logf("servce end with error: %v", err) + } + }() + + var onRequest OnRequest = func(ctx context.Context, connection Connection) error { + defer wg.Done() + buf, err := connection.Reader().Next(4) + MustNil(t, err) + Equal(t, string(buf), "ping") + _, err = connection.Writer().WriteString("pong") + MustNil(t, err) + err = connection.Writer().Flush() + MustNil(t, err) + return nil + } + conns := 100 + // server: OnPrepare, OnConnect, OnRequest, CloseCallback + // client: OnRequest, CloseCallback + wg.Add(conns * 6) + for i := 0; i < conns; i++ { + go func() { + conn, err := DialConnection("tcp", ":12345", time.Second) + MustNil(t, err) + err = conn.SetOnRequest(onRequest) + MustNil(t, err) + conn.AddCloseCallback(func(connection Connection) error { + wg.Done() + return nil + }) + }() + } + wg.Wait() +} From 41673bdd618911d3c9ee7fccfd4ff5f98d4952cb Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 14 Sep 2023 17:09:20 +0800 Subject: [PATCH 2/5] fix: protect detach twice --- connection_onevent.go | 24 +++++++++++++++++++----- connection_reactor.go | 9 +++++---- connection_test.go | 12 +++++++++--- fd_operator.go | 7 +++++++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index 6ddbf31e..a8bd55e6 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -48,6 +48,7 @@ type onEvent struct { onConnectCallback atomic.Value onRequestCallback atomic.Value closeCallbacks atomic.Value // value is latest *callbackNode + closeCallbackOnce int32 } type callbackNode struct { @@ -195,13 +196,26 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f if isProcessable(c) { process(c) } - for !c.isCloseBy(user) && isProcessable(c) { + // `process` must either eventually read all the input data or actively Close the connection, + // otherwise the goroutine will fall into a dead loop. + var closedBy who + for { + closedBy = c.status(closing) + // close by user or no processable + if closedBy == user || !isProcessable(c) { + break + } process(c) } // Handling callback if connection has been closed. - if !c.IsActive() { - // connection if closed by user when processing, so it needs detach - c.closeCallback(false, true) + if closedBy != none { + // if closed by user when processing, it "may" needs detach + needDetach := closedBy == user + // Here is a conor case that operator will be detached twice: + // If server closed the connection(client OnHup will detach op first and closeBy=poller), + // and then client's OnRequest function also closed the connection(closeBy=user). + // But operator already prevent that detach twice will not cause any problem + c.closeCallback(false, needDetach) panicked = false return } @@ -229,7 +243,7 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) { if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered. // PollDetach only happen when user call conn.Close() or poller detect error if err := c.operator.Control(PollDetach); err != nil { - logger.Printf("NETPOLL: onClose detach operator failed: %v", err) + logger.Printf("NETPOLL: closeCallback[%v,%v] detach operator failed: %v", needLock, needDetach, err) } } var latest = c.closeCallbacks.Load() diff --git a/connection_reactor.go b/connection_reactor.go index 2acd45ce..fa485be1 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -31,11 +31,12 @@ func (c *connection) onHup(p Poll) error { c.triggerRead(Exception(ErrEOF, "peer close")) c.triggerWrite(Exception(ErrConnClosed, "peer close")) // It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively. - // It can be confirmed that the OnRequest goroutine has been exited before closecallback executing, + // It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing, // and it is safe to close the buffer at this time. - var onConnect, _ = c.onConnectCallback.Load().(OnConnect) - var onRequest, _ = c.onRequestCallback.Load().(OnRequest) - if onConnect != nil || onRequest != nil { + var onConnect = c.onConnectCallback.Load() + var onRequest = c.onRequestCallback.Load() + var needCloseByUser = onConnect == nil && onRequest == nil + if !needCloseByUser { // already PollDetach when call OnHup c.closeCallback(true, false) } diff --git a/connection_test.go b/connection_test.go index 3ecaba30..04e58cb9 100644 --- a/connection_test.go +++ b/connection_test.go @@ -594,16 +594,21 @@ func TestConnectionServerClose(t *testing.T) { } }() - var onRequest OnRequest = func(ctx context.Context, connection Connection) error { + var clientOnRequest OnRequest = func(ctx context.Context, connection Connection) error { defer wg.Done() buf, err := connection.Reader().Next(4) MustNil(t, err) Equal(t, string(buf), "ping") + _, err = connection.Writer().WriteString("pong") MustNil(t, err) err = connection.Writer().Flush() MustNil(t, err) - return nil + + _, err = connection.Reader().Next(1) // server will not send any data, just wait for server close + MustTrue(t, errors.Is(err, ErrEOF)) // should get EOF when server close + + return connection.Close() } conns := 100 // server: OnPrepare, OnConnect, OnRequest, CloseCallback @@ -613,7 +618,7 @@ func TestConnectionServerClose(t *testing.T) { go func() { conn, err := DialConnection("tcp", ":12345", time.Second) MustNil(t, err) - err = conn.SetOnRequest(onRequest) + err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) conn.AddCloseCallback(func(connection Connection) error { wg.Done() @@ -621,5 +626,6 @@ func TestConnectionServerClose(t *testing.T) { }) }() } + //time.Sleep(time.Second) wg.Wait() } diff --git a/fd_operator.go b/fd_operator.go index 4132fe9c..1ac843a9 100644 --- a/fd_operator.go +++ b/fd_operator.go @@ -42,6 +42,9 @@ type FDOperator struct { // poll is the registered location of the file descriptor. poll Poll + // protect only detach once + detached int32 + // private, used by operatorCache next *FDOperator state int32 // CAS: 0(unused) 1(inuse) 2(do-done) @@ -49,6 +52,9 @@ type FDOperator struct { } func (op *FDOperator) Control(event PollEvent) error { + if event == PollDetach && atomic.AddInt32(&op.detached, 1) > 1 { + return nil + } return op.poll.Control(op, event) } @@ -92,4 +98,5 @@ func (op *FDOperator) reset() { op.Inputs, op.InputAck = nil, nil op.Outputs, op.OutputAck = nil, nil op.poll = nil + op.detached = 0 } From 6d95fa766e9f6f68e1c3d636cd67a4df4356760c Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 14 Sep 2023 17:12:00 +0800 Subject: [PATCH 3/5] chore: fix ci --- .github/workflows/pr-check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index d61d604b..50a8ce2c 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -1,6 +1,6 @@ name: Push and Pull Request Check -on: [ push ] +on: [ push, pull_request ] jobs: compatibility-test: From 340195fbf7546cc09a56ab72d3ae93f9700ab036 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 14 Sep 2023 17:13:34 +0800 Subject: [PATCH 4/5] chore: add 1.21 test --- .github/workflows/pr-check.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 50a8ce2c..a923b418 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -6,7 +6,7 @@ jobs: compatibility-test: strategy: matrix: - go: [ 1.15, "1.20" ] + go: [ 1.15, "1.21" ] os: [ X64, ARM64 ] runs-on: ${{ matrix.os }} steps: From 0ba765384355e32f83927c6ac683e0579520d0d5 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 14 Sep 2023 17:29:01 +0800 Subject: [PATCH 5/5] chore: add more test log --- connection_onevent.go | 1 - connection_test.go | 29 +++++++++++++++++++---------- nocopy_linkbuffer.go | 8 ++++---- nocopy_linkbuffer_race.go | 8 ++++---- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index a8bd55e6..9b87f01b 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -48,7 +48,6 @@ type onEvent struct { onConnectCallback atomic.Value onRequestCallback atomic.Value closeCallbacks atomic.Value // value is latest *callbackNode - closeCallbackOnce int32 } type callbackNode struct { diff --git a/connection_test.go b/connection_test.go index 04e58cb9..6de6f017 100644 --- a/connection_test.go +++ b/connection_test.go @@ -554,34 +554,41 @@ func TestConnectionServerClose(t *testing.T) { - Client <-- close --- Server - Client --- close --> Server */ - + const PING, PONG = "ping", "pong" var wg sync.WaitGroup el, err := NewEventLoop( func(ctx context.Context, connection Connection) error { + t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) defer wg.Done() - buf, err := connection.Reader().Next(4) // pong - Equal(t, string(buf), "pong") + buf, err := connection.Reader().Next(len(PONG)) // pong + Equal(t, string(buf), PONG) + MustNil(t, err) + err = connection.Reader().Release() + MustNil(t, err) + err = connection.Close() MustNil(t, err) - return connection.Close() + return err }, WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + t.Logf("server.OnConnect: addr=%s", connection.RemoteAddr()) defer wg.Done() // check OnPrepare v := ctx.Value("prepare").(string) Equal(t, v, "true") - send := []byte("ping") - _, err = connection.Writer().WriteBinary(send) + _, err := connection.Writer().WriteBinary([]byte(PING)) MustNil(t, err) err = connection.Writer().Flush() MustNil(t, err) connection.AddCloseCallback(func(connection Connection) error { + t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) wg.Done() return nil }) return ctx }), WithOnPrepare(func(connection Connection) context.Context { + t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) defer wg.Done() return context.WithValue(context.Background(), "prepare", "true") }), @@ -595,12 +602,13 @@ func TestConnectionServerClose(t *testing.T) { }() var clientOnRequest OnRequest = func(ctx context.Context, connection Connection) error { + t.Logf("client.OnRequest: addr=%s", connection.LocalAddr()) defer wg.Done() - buf, err := connection.Reader().Next(4) + buf, err := connection.Reader().Next(len(PING)) MustNil(t, err) - Equal(t, string(buf), "ping") + Equal(t, string(buf), PING) - _, err = connection.Writer().WriteString("pong") + _, err = connection.Writer().WriteBinary([]byte(PONG)) MustNil(t, err) err = connection.Writer().Flush() MustNil(t, err) @@ -621,7 +629,8 @@ func TestConnectionServerClose(t *testing.T) { err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) conn.AddCloseCallback(func(connection Connection) error { - wg.Done() + t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) + defer wg.Done() return nil }) }() diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 59cc6530..555ba5ce 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -461,9 +461,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { } // here will copy b.growth(n) - malloc := b.write.malloc - b.write.malloc += n - return copy(b.write.buf[malloc:b.write.malloc], p), nil + buf := b.write.Malloc(n) + return copy(buf, p), nil } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. @@ -578,7 +577,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // // bookSize: The size of data that can be read at once. // maxSize: The maximum size of data between two Release(). In some cases, this can -// guarantee all data allocated in one node to reduce copy. +// +// guarantee all data allocated in one node to reduce copy. func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { l := cap(b.write.buf) - b.write.malloc // grow linkBuffer diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index a785aa15..4b3635d0 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -497,9 +497,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { } // here will copy b.growth(n) - malloc := b.write.malloc - b.write.malloc += n - return copy(b.write.buf[malloc:b.write.malloc], p), nil + buf := b.write.Malloc(n) + return copy(buf, p), nil } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. @@ -622,7 +621,8 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // // bookSize: The size of data that can be read at once. // maxSize: The maximum size of data between two Release(). In some cases, this can -// guarantee all data allocated in one node to reduce copy. +// +// guarantee all data allocated in one node to reduce copy. func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { b.Lock() defer b.Unlock()