1
0
mirror of https://github.com/kataras/iris.git synced 2025-12-27 14:57:05 +00:00

websocket: from 1k to 100k on a simple raspeberry pi 3 model b by using a bit lower level of the new ws lib api and restore the previous sync.Map for server's live connections, relative: https://github.com/kataras/iris/issues/1178

Former-commit-id: 40da148afb66a42d47285efce324269d66ed3b0e
This commit is contained in:
Gerasimos (Makis) Maropoulos
2019-02-18 04:42:57 +02:00
parent eb22309aec
commit 65c1fbf7f2
8 changed files with 295 additions and 96 deletions

View File

@@ -1,4 +0,0 @@
# This is the official list of Iris Websocket authors for copyright
# purposes.
Gerasimos Maropoulos <kataras2006@hotmail.com>

View File

@@ -1,27 +0,0 @@
Copyright (c) 2017-2018 The Iris Websocket Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Iris nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -5,6 +5,7 @@ import (
stdContext "context"
"errors"
"io"
"io/ioutil"
"net"
"strconv"
"strings"
@@ -267,6 +268,9 @@ type (
ctx context.Context
values ConnectionValues
server *Server
writer *wsutil.Writer
// #119 , websocket writers are not protected by locks inside the gorilla's websocket code
// so we must protect them otherwise we're getting concurrent connection error on multi writers in the same time.
writerMu sync.Mutex
@@ -304,6 +308,8 @@ func newConnection(conn net.Conn, cfg ConnectionConfig) *connection {
c.defaultMessageType = BinaryMessage
}
// c.writer = wsutil.NewWriter(conn, c.getState(), c.defaultMessageType)
return c
}
@@ -350,17 +356,26 @@ func (c *connection) getState() ws.State {
// Write writes a raw websocket message with a specific type to the client
// used by ping messages and any CloseMessage types.
func (c *connection) Write(websocketMessageType ws.OpCode, data []byte) error {
func (c *connection) Write(websocketMessageType ws.OpCode, data []byte) (err error) {
// for any-case the app tries to write from different goroutines,
// we must protect them because they're reporting that as bug...
c.writerMu.Lock()
defer c.writerMu.Unlock()
if writeTimeout := c.config.WriteTimeout; writeTimeout > 0 {
// set the write deadline based on the configuration
c.underline.SetWriteDeadline(time.Now().Add(writeTimeout))
}
err := wsutil.WriteMessage(c.underline, c.getState(), websocketMessageType, data)
c.writerMu.Unlock()
// 2.
// if websocketMessageType != c.defaultMessageType {
// err = wsutil.WriteMessage(c.underline, c.getState(), websocketMessageType, data)
// } else {
// _, err = c.writer.Write(data)
// c.writer.Flush()
// }
err = wsutil.WriteMessage(c.underline, c.getState(), websocketMessageType, data)
if err != nil {
// if failed then the connection is off, fire the disconnect
c.Disconnect()
@@ -440,29 +455,125 @@ func (c *connection) isErrClosed(err error) bool {
}
func (c *connection) startReader() {
defer c.Disconnect()
hasReadTimeout := c.config.ReadTimeout > 0
for {
if c == nil || c.underline == nil || atomic.LoadUint32(&c.disconnected) > 0 {
return
}
controlHandler := wsutil.ControlFrameHandler(c.underline, c.getState())
rd := wsutil.Reader{
Source: c.underline,
State: c.getState(),
CheckUTF8: false,
SkipHeaderCheck: false,
OnIntermediate: controlHandler,
}
for {
if hasReadTimeout {
// set the read deadline based on the configuration
c.underline.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
}
data, code, err := wsutil.ReadData(c.underline, c.getState())
if code == CloseMessage || c.isErrClosed(err) {
c.Disconnect()
hdr, err := rd.NextFrame()
if err != nil {
return
}
if hdr.OpCode.IsControl() {
if err := controlHandler(hdr, &rd); err != nil {
return
}
continue
}
if hdr.OpCode&TextMessage == 0 && hdr.OpCode&BinaryMessage == 0 {
if err := rd.Discard(); err != nil {
return
}
continue
}
data, err := ioutil.ReadAll(&rd)
if err != nil {
return
}
if err != nil {
c.FireOnError(err)
}
c.messageReceived(data)
// 4.
// var buf bytes.Buffer
// data, code, err := wsutil.ReadData(struct {
// io.Reader
// io.Writer
// }{c.underline, &buf}, c.getState())
// if err != nil {
// if _, closed := err.(*net.OpError); closed && code == 0 {
// c.Disconnect()
// return
// } else if _, closed = err.(wsutil.ClosedError); closed {
// c.Disconnect()
// return
// // > 1200 conns but I don't know why yet:
// } else if err == ws.ErrProtocolOpCodeReserved || err == ws.ErrProtocolNonZeroRsv {
// c.Disconnect()
// return
// } else if err == io.EOF || err == io.ErrUnexpectedEOF {
// c.Disconnect()
// return
// }
// c.FireOnError(err)
// }
// c.messageReceived(data)
// 2.
// header, err := reader.NextFrame()
// if err != nil {
// println("next frame err: " + err.Error())
// return
// }
// if header.OpCode == ws.OpClose { // io.EOF.
// return
// }
// payload := make([]byte, header.Length)
// _, err = io.ReadFull(reader, payload)
// if err != nil {
// return
// }
// if header.Masked {
// ws.Cipher(payload, header.Mask, 0)
// }
// c.messageReceived(payload)
// data, code, err := wsutil.ReadData(c.underline, c.getState())
// // if code == CloseMessage || c.isErrClosed(err) {
// // c.Disconnect()
// // return
// // }
// if err != nil {
// if _, closed := err.(*net.OpError); closed && code == 0 {
// c.Disconnect()
// return
// } else if _, closed = err.(wsutil.ClosedError); closed {
// c.Disconnect()
// return
// // > 1200 conns but I don't know why yet:
// } else if err == ws.ErrProtocolOpCodeReserved || err == ws.ErrProtocolNonZeroRsv {
// c.Disconnect()
// return
// } else if err == io.EOF || err == io.ErrUnexpectedEOF {
// c.Disconnect()
// return
// }
// c.FireOnError(err)
// }
// c.messageReceived(data)
}
}
@@ -801,6 +912,16 @@ var ErrBadHandshake = ws.ErrHandshakeBadConnection
//
// Custom dialers can be used by wrapping the iris websocket connection via `websocket.WrapConnection`.
func Dial(ctx stdContext.Context, url string, cfg ConnectionConfig) (ClientConnection, error) {
c, err := dial(ctx, url, cfg)
if err != nil {
time.Sleep(1 * time.Second)
c, err = dial(ctx, url, cfg)
}
return c, err
}
func dial(ctx stdContext.Context, url string, cfg ConnectionConfig) (ClientConnection, error) {
if ctx == nil {
ctx = stdContext.Background()
}

View File

@@ -45,9 +45,9 @@ type (
// Use a route to serve this file on a specific path, i.e
// app.Any("/iris-ws.js", func(ctx iris.Context) { ctx.Write(mywebsocketServer.ClientSource) })
ClientSource []byte
connections map[string]*connection // key = the Connection ID.
rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name
mu sync.RWMutex // for rooms and connections.
connections sync.Map // key = the Connection ID. // key = the Connection ID.
rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name
mu sync.RWMutex // for rooms.
onConnectionListeners []ConnectionFunc
//connectionPool sync.Pool // sadly we can't make this because the websocket connection is live until is closed.
upgrader ws.HTTPUpgrader
@@ -64,7 +64,7 @@ func New(cfg Config) *Server {
return &Server{
config: cfg,
ClientSource: bytes.Replace(ClientSource, []byte(DefaultEvtMessageKey), cfg.EvtMessagePrefix, -1),
connections: make(map[string]*connection),
connections: sync.Map{}, // ready-to-use, this is not necessary.
rooms: make(map[string][]string),
onConnectionListeners: make([]ConnectionFunc, 0),
upgrader: ws.DefaultHTTPUpgrader, // ws.DefaultUpgrader,
@@ -126,14 +126,19 @@ func (s *Server) Upgrade(ctx context.Context) Connection {
}
func (s *Server) addConnection(c *connection) {
s.mu.Lock()
s.connections[c.id] = c
s.mu.Unlock()
s.connections.Store(c.id, c)
}
func (s *Server) getConnection(connID string) (*connection, bool) {
c, ok := s.connections[connID]
return c, ok
if cValue, ok := s.connections.Load(connID); ok {
// this cast is not necessary,
// we know that we always save a connection, but for good or worse let it be here.
if conn, ok := cValue.(*connection); ok {
return conn, ok
}
}
return nil, false
}
// wrapConnection wraps an underline connection to an iris websocket connection.
@@ -278,24 +283,34 @@ func (s *Server) leave(roomName string, connID string) (left bool) {
// GetTotalConnections returns the number of total connections
func (s *Server) GetTotalConnections() (n int) {
s.mu.RLock()
n = len(s.connections)
s.mu.RUnlock()
s.connections.Range(func(k, v interface{}) bool {
n++
return true
})
return
}
// GetConnections returns all connections
func (s *Server) GetConnections() []Connection {
s.mu.RLock()
conns := make([]Connection, len(s.connections))
// first call of Range to get the total length, we don't want to use append or manually grow the list here for many reasons.
length := s.GetTotalConnections()
conns := make([]Connection, length, length)
i := 0
for _, c := range s.connections {
conns[i] = c
// second call of Range.
s.connections.Range(func(k, v interface{}) bool {
conn, ok := v.(*connection)
if !ok {
// if for some reason (should never happen), the value is not stored as *connection
// then stop the iteration and don't continue insertion of the result connections
// in order to avoid any issues while end-dev will try to iterate a nil entry.
return false
}
conns[i] = conn
i++
}
return true
})
s.mu.RUnlock()
return conns
}
@@ -317,8 +332,10 @@ func (s *Server) GetConnectionsByRoom(roomName string) []Connection {
if connIDs, found := s.rooms[roomName]; found {
for _, connID := range connIDs {
// existence check is not necessary here.
if conn, ok := s.connections[connID]; ok {
conns = append(conns, conn)
if cValue, ok := s.connections.Load(connID); ok {
if conn, ok := cValue.(*connection); ok {
conns = append(conns, conn)
}
}
}
}
@@ -358,20 +375,32 @@ func (s *Server) emitMessage(from, to string, data []byte) {
}
}
} else {
s.mu.RLock()
// it suppose to send the message to all opened connections or to all except the sender.
for _, conn := range s.connections {
if to != All && to != conn.id { // if it's not suppose to send to all connections (including itself)
if to == Broadcast && from == conn.id { // if broadcast to other connections except this
// here we do the opossite of previous block,
// just skip this connection when it's suppose to send the message to all connections except the sender.
continue
}
s.connections.Range(func(k, v interface{}) bool {
connID, ok := k.(string)
if !ok {
// should never happen.
return true
}
conn.writeDefault(data)
}
s.mu.RUnlock()
if to != All && to != connID { // if it's not suppose to send to all connections (including itself)
if to == Broadcast && from == connID { // if broadcast to other connections except this
// here we do the opossite of previous block,
// just skip this connection when it's suppose to send the message to all connections except the sender.
return true
}
}
// not necessary cast.
conn, ok := v.(*connection)
if ok {
// send to the client(s) when the top validators passed
conn.writeDefault(data)
}
return ok
})
}
}
@@ -395,9 +424,7 @@ func (s *Server) Disconnect(connID string) (err error) {
// fire the disconnect callbacks, if any.
conn.fireDisconnect()
s.mu.Lock()
delete(s.connections, conn.id)
s.mu.Unlock()
s.connections.Delete(connID)
err = conn.underline.Close()
}