1
0
mirror of https://github.com/kataras/iris.git synced 2026-01-02 17:57:11 +00:00

some cleanup, and remove the test 'testwebocket2' package at all; A lower-level fast websocket impl based on gobwas/ws will be published on a different repo, it is a WIP

Former-commit-id: b680974c593196ce20865ed12778929ced6afea1
This commit is contained in:
Gerasimos (Makis) Maropoulos
2019-02-22 21:24:10 +02:00
parent c477251d1f
commit bda36145e5
25 changed files with 110 additions and 2769 deletions

View File

@@ -1,4 +1,4 @@
Copyright (c) 2017-2018 The Iris Websocket Authors. All rights reserved.
Copyright (c) 2017-2019 The Iris Websocket Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are

View File

@@ -1,8 +1,8 @@
package websocket
import (
"math/rand"
"net/http"
"strconv"
"time"
"github.com/kataras/iris/context"
@@ -33,19 +33,18 @@ const (
)
var (
// DefaultIDGenerator returns a random unique for a new connection.
// DefaultIDGenerator returns a random unique string for a new connection.
// Used when config.IDGenerator is nil.
DefaultIDGenerator = func(context.Context) string {
id, err := uuid.NewV4()
if err != nil {
return randomString(64)
return strconv.FormatInt(time.Now().Unix(), 10)
}
return id.String()
}
)
// Config the websocket server configuration
// all of these are optional.
// Config contains the websocket server's configuration, optional.
type Config struct {
// IDGenerator used to create (and later on, set)
// an ID for each incoming websocket connections (clients).
@@ -158,37 +157,3 @@ func (c Config) Validate() Config {
return c
}
const (
letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
var src = rand.NewSource(time.Now().UnixNano())
// random takes a parameter (int) and returns random slice of byte
// ex: var randomstrbytes []byte; randomstrbytes = utils.Random(32)
func random(n int) []byte {
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return b
}
// randomString accepts a number(10 for example) and returns a random string using simple but fairly safe random algorithm
func randomString(n int) string {
return string(random(n))
}

View File

@@ -4,7 +4,6 @@ import (
"bytes"
stdContext "context"
"errors"
"io"
"net"
"strconv"
"strings"
@@ -93,50 +92,6 @@ func (r *ConnectionValues) Reset() {
*r = (*r)[:0]
}
// UnderlineConnection is the underline connection, nothing to think about,
// it's used internally mostly but can be used for extreme cases with other libraries.
type UnderlineConnection interface {
// SetWriteDeadline sets the write deadline on the underlying network
// connection. After a write has timed out, the websocket state is corrupt and
// all future writes will return an error. A zero value for t means writes will
// not time out.
SetWriteDeadline(t time.Time) error
// SetReadDeadline sets the read deadline on the underlying network connection.
// After a read has timed out, the websocket connection state is corrupt and
// all future reads will return an error. A zero value for t means reads will
// not time out.
SetReadDeadline(t time.Time) error
// SetReadLimit sets the maximum size for a message read from the peer. If a
// message exceeds the limit, the connection sends a close frame to the peer
// and returns ErrReadLimit to the application.
SetReadLimit(limit int64)
// SetPongHandler sets the handler for pong messages received from the peer.
// The appData argument to h is the PONG frame application data. The default
// pong handler does nothing.
SetPongHandler(h func(appData string) error)
// SetPingHandler sets the handler for ping messages received from the peer.
// The appData argument to h is the PING frame application data. The default
// ping handler sends a pong to the peer.
SetPingHandler(h func(appData string) error)
// WriteControl writes a control message with the given deadline. The allowed
// message types are CloseMessage, PingMessage and PongMessage.
WriteControl(messageType int, data []byte, deadline time.Time) error
// WriteMessage is a helper method for getting a writer using NextWriter,
// writing the message and closing the writer.
WriteMessage(messageType int, data []byte) error
// ReadMessage is a helper method for getting a reader using NextReader and
// reading from that reader to a buffer.
ReadMessage() (messageType int, p []byte, err error)
// NextWriter returns a writer for the next message to send. The writer's Close
// method flushes the complete message to the network.
//
// There can be at most one open writer on a connection. NextWriter closes the
// previous writer if the application has not already done so.
NextWriter(messageType int) (io.WriteCloser, error)
// Close closes the underlying network connection without sending or waiting for a close frame.
Close() error
}
// -------------------------------------------------------------------------------------
// -------------------------------------------------------------------------------------
// -------------------------------Connection implementation-----------------------------
@@ -239,11 +194,12 @@ type (
// after the "On" events IF server's `Upgrade` is used,
// otherise you don't have to call it because the `Handler()` does it automatically.
Wait()
UnderlyingConn() *websocket.Conn
}
connection struct {
err error
underline UnderlineConnection
underline *websocket.Conn
config ConnectionConfig
defaultMessageType int
serializer *messageSerializer
@@ -281,11 +237,11 @@ var _ Connection = &connection{}
// WrapConnection wraps the underline websocket connection into a new iris websocket connection.
// The caller should call the `connection#Wait` (which blocks) to enable its read and write functionality.
func WrapConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) Connection {
func WrapConnection(underlineConn *websocket.Conn, cfg ConnectionConfig) Connection {
return newConnection(underlineConn, cfg)
}
func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection {
func newConnection(underlineConn *websocket.Conn, cfg ConnectionConfig) *connection {
cfg = cfg.Validate()
c := &connection{
underline: underlineConn,
@@ -308,7 +264,7 @@ func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *con
return c
}
func newServerConnection(ctx context.Context, s *Server, underlineConn UnderlineConnection, id string) *connection {
func newServerConnection(ctx context.Context, s *Server, underlineConn *websocket.Conn, id string) *connection {
c := newConnection(underlineConn, ConnectionConfig{
EvtMessagePrefix: s.config.EvtMessagePrefix,
WriteTimeout: s.config.WriteTimeout,
@@ -334,6 +290,10 @@ func newServerConnection(ctx context.Context, s *Server, underlineConn Underline
return c
}
func (c *connection) UnderlyingConn() *websocket.Conn {
return c.underline
}
// Err is not nil if the upgrader failed to upgrade http to websocket connection.
func (c *connection) Err() error {
return c.err

View File

@@ -15,19 +15,6 @@ type (
// Receives one parameter which is the Connection
ConnectionFunc func(Connection)
// websocketRoomPayload is used as payload from the connection to the Server
websocketRoomPayload struct {
roomName string
connectionID string
}
// payloads, connection -> Server
websocketMessagePayload struct {
from string
to string
data []byte
}
// Server is the websocket Server's implementation.
//
// It listens for websocket clients (either from the javascript client-side or from any websocket implementation).
@@ -44,9 +31,9 @@ type (
// Use a route to serve this file on a specific path, i.e
// app.Any("/iris-ws.js", func(ctx iris.Context) { ctx.Write(mywebsocketServer.ClientSource) })
ClientSource []byte
connections map[string]*connection // key = the Connection ID.
rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name
mu sync.RWMutex // for rooms and connections.
connections sync.Map // key = the Connection ID.
rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name
mu sync.RWMutex // for rooms.
onConnectionListeners []ConnectionFunc
//connectionPool sync.Pool // sadly we can't make this because the websocket connection is live until is closed.
upgrader websocket.Upgrader
@@ -63,7 +50,7 @@ func New(cfg Config) *Server {
return &Server{
config: cfg,
ClientSource: bytes.Replace(ClientSource, []byte(DefaultEvtMessageKey), cfg.EvtMessagePrefix, -1),
connections: make(map[string]*connection),
connections: sync.Map{}, // ready-to-use, this is not necessary.
rooms: make(map[string][]string),
onConnectionListeners: make([]ConnectionFunc, 0),
upgrader: websocket.Upgrader{
@@ -132,19 +119,24 @@ func (s *Server) Upgrade(ctx context.Context) Connection {
}
func (s *Server) addConnection(c *connection) {
s.mu.Lock()
s.connections[c.id] = c
s.mu.Unlock()
s.connections.Store(c.id, c)
}
func (s *Server) getConnection(connID string) (*connection, bool) {
c, ok := s.connections[connID]
return c, ok
if cValue, ok := s.connections.Load(connID); ok {
// this cast is not necessary,
// we know that we always save a connection, but for good or worse let it be here.
if conn, ok := cValue.(*connection); ok {
return conn, ok
}
}
return nil, false
}
// wrapConnection wraps an underline connection to an iris websocket connection.
// It does NOT starts its writer, reader and event mux, the caller is responsible for that.
func (s *Server) handleConnection(ctx context.Context, websocketConn UnderlineConnection) *connection {
func (s *Server) handleConnection(ctx context.Context, websocketConn *websocket.Conn) *connection {
// use the config's id generator (or the default) to create a websocket client/connection id
cid := s.config.IDGenerator(ctx)
// create the new connection
@@ -282,27 +274,31 @@ func (s *Server) leave(roomName string, connID string) (left bool) {
return
}
// GetTotalConnections returns the number of total connections
// GetTotalConnections returns the number of total connections.
func (s *Server) GetTotalConnections() (n int) {
s.mu.RLock()
n = len(s.connections)
s.mu.RUnlock()
s.connections.Range(func(k, v interface{}) bool {
n++
return true
})
return
return n
}
// GetConnections returns all connections
func (s *Server) GetConnections() []Connection {
s.mu.RLock()
conns := make([]Connection, len(s.connections))
i := 0
for _, c := range s.connections {
conns[i] = c
i++
}
// GetConnections returns all connections.
func (s *Server) GetConnections() (conns []Connection) {
s.connections.Range(func(k, v interface{}) bool {
conn, ok := v.(*connection)
if !ok {
// if for some reason (should never happen), the value is not stored as *connection
// then stop the iteration and don't continue insertion of the result connections
// in order to avoid any issues while end-dev will try to iterate a nil entry.
return false
}
conns = append(conns, conn)
return true
})
s.mu.RUnlock()
return conns
return
}
// GetConnection returns single connection
@@ -317,21 +313,19 @@ func (s *Server) GetConnection(connID string) Connection {
// GetConnectionsByRoom returns a list of Connection
// which are joined to this room.
func (s *Server) GetConnectionsByRoom(roomName string) []Connection {
var conns []Connection
s.mu.RLock()
func (s *Server) GetConnectionsByRoom(roomName string) (conns []Connection) {
if connIDs, found := s.rooms[roomName]; found {
for _, connID := range connIDs {
// existence check is not necessary here.
if conn, ok := s.connections[connID]; ok {
conns = append(conns, conn)
if cValue, ok := s.connections.Load(connID); ok {
if conn, ok := cValue.(*connection); ok {
conns = append(conns, conn)
}
}
}
}
s.mu.RUnlock()
return conns
return
}
// emitMessage is the main 'router' of the messages coming from the connection
@@ -364,20 +358,32 @@ func (s *Server) emitMessage(from, to string, data []byte) {
}
}
} else {
s.mu.RLock()
// it suppose to send the message to all opened connections or to all except the sender.
for _, conn := range s.connections {
if to != All && to != conn.id { // if it's not suppose to send to all connections (including itself)
if to == Broadcast && from == conn.id { // if broadcast to other connections except this
// here we do the opossite of previous block,
// just skip this connection when it's suppose to send the message to all connections except the sender.
continue
}
s.connections.Range(func(k, v interface{}) bool {
connID, ok := k.(string)
if !ok {
// should never happen.
return true
}
conn.writeDefault(data)
}
s.mu.RUnlock()
if to != All && to != connID { // if it's not suppose to send to all connections (including itself)
if to == Broadcast && from == connID { // if broadcast to other connections except this
// here we do the opossite of previous block,
// just skip this connection when it's suppose to send the message to all connections except the sender.
return true
}
}
// not necessary cast.
conn, ok := v.(*connection)
if ok {
// send to the client(s) when the top validators passed
conn.writeDefault(data)
}
return ok
})
}
}
@@ -402,9 +408,7 @@ func (s *Server) Disconnect(connID string) (err error) {
// close the underline connection and return its error, if any.
err = conn.underline.Close()
s.mu.Lock()
delete(s.connections, conn.id)
s.mu.Unlock()
s.connections.Delete(connID)
}
return