1
0
mirror of https://github.com/kataras/iris.git synced 2026-01-06 03:27:27 +00:00

release version 12.2.10

This commit is contained in:
Gerasimos (Makis) Maropoulos
2024-01-18 03:11:54 +02:00
parent 12546322eb
commit 113ce190e6
31 changed files with 314 additions and 208 deletions

View File

@@ -2,6 +2,7 @@
package host_test
import (
stdContext "context"
"crypto/tls"
"net"
"net/url"
@@ -30,6 +31,7 @@ func TestProxy(t *testing.T) {
MinVersion: tls.VersionTLS13,
}
proxy := host.NewProxy("", u, config)
proxy.Configure(host.NonBlocking())
addr := &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
@@ -41,10 +43,15 @@ func TestProxy(t *testing.T) {
t.Fatalf("%v while creating listener", err)
}
go proxy.Serve(listener) // should be localhost/127.0.0.1:80 but travis throws permission denied.
// non-blocking (see above).
proxy.Serve(listener)
ctx, cancelFunc := stdContext.WithTimeout(stdContext.Background(), 15*time.Second)
defer cancelFunc()
// Wait for up to 15 seconds or until the proxy is ready to serve or a serve failure.
proxy.Wait(ctx)
t.Log(listener.Addr().String())
<-time.After(time.Second)
t.Log(listener.Addr().String())
app := iris.New()
@@ -60,7 +67,7 @@ func TestProxy(t *testing.T) {
ctx.WriteString(unexpectedRoute)
})
l, err := net.Listen("tcp", "localhost:4444") // should be localhost/127.0.0.1:443 but travis throws permission denied.
l, err := net.Listen("tcp", "localhost:4444")
if err != nil {
t.Fatalf("%v while creating tcp4 listener for new tls local test listener", err)
}

View File

@@ -25,6 +25,13 @@ import (
// Look the `Configure` func for more.
type Configurator func(su *Supervisor)
// NonBlocking sets the server to non-blocking mode. Use its `Wait` method to wait for server to be up and running.
func NonBlocking() Configurator {
return func(su *Supervisor) {
su.nonBlocking = true
}
}
// Supervisor is the wrapper and the manager for a compatible server
// and it's relative actions, called Tasks.
//
@@ -34,14 +41,12 @@ type Supervisor struct {
// FriendlyAddr can be set to customize the "Now Listening on: {FriendlyAddr}".
FriendlyAddr string // e.g mydomain.com instead of :443 when AutoTLS is used, see `WriteStartupLogOnServe` task.
disableHTTP1ToHTTP2Redirection bool
closedManually uint32 // future use, accessed atomically (non-zero means we've called the Shutdown)
closedByInterruptHandler uint32 // non-zero means that the end-developer interrupted it by-purpose.
manuallyTLS bool // we need that in order to determinate what to output on the console before the server begin.
autoTLS bool
shouldWait int32 // non-zero means that the host should wait for unblocking
unblockChan chan struct{}
mu sync.Mutex
closedByInterruptHandler uint32 // non-zero means that the end-developer interrupted it by-purpose.
manuallyTLS bool // we need that in order to determinate what to output on the console before the server begin.
autoTLS bool
mu sync.RWMutex
onServe []func(TaskHost)
// IgnoreErrors should contains the errors that should be ignored
@@ -73,6 +78,10 @@ type Supervisor struct {
// If more than zero then tcp keep alive listener is attached instead of the simple TCP listener.
// See `iris.Configuration.KeepAlive`
KeepAlive time.Duration
address string
nonBlocking bool
waiter *Waiter
}
// New returns a new host supervisor
@@ -83,10 +92,12 @@ type Supervisor struct {
// It has its own flow, which means that you can prevent
// to return and exit and restore the flow too.
func New(srv *http.Server) *Supervisor {
return &Supervisor{
Server: srv,
unblockChan: make(chan struct{}, 1),
su := &Supervisor{
Server: srv,
}
su.waiter = NewWaiter(7, su.getAddress)
return su
}
// Configure accepts one or more `Configurator`.
@@ -113,36 +124,6 @@ func (su *Supervisor) NoRedirect() {
su.disableHTTP1ToHTTP2Redirection = true
}
// DeferFlow defers the flow of the exeuction,
// i.e: when server should return error and exit
// from app, a DeferFlow call inside a Task
// can wait for a `RestoreFlow` to exit or not exit if
// host's server is "fixed".
//
// See `RestoreFlow` too.
func (su *Supervisor) DeferFlow() {
atomic.StoreInt32(&su.shouldWait, 1)
}
// RestoreFlow restores the flow of the execution,
// if called without a `DeferFlow` call before
// then it does nothing.
// See tests to understand how that can be useful on specific cases.
//
// See `DeferFlow` too.
func (su *Supervisor) RestoreFlow() {
if su.isWaiting() {
atomic.StoreInt32(&su.shouldWait, 0)
su.mu.Lock()
su.unblockChan <- struct{}{}
su.mu.Unlock()
}
}
func (su *Supervisor) isWaiting() bool {
return atomic.LoadInt32(&su.shouldWait) != 0
}
func (su *Supervisor) newListener() (net.Listener, error) {
var (
l net.Listener
@@ -198,14 +179,28 @@ func (su *Supervisor) validateErr(err error) error {
}
func (su *Supervisor) notifyErr(err error) {
err = su.validateErr(err)
if err != nil {
su.mu.Lock()
for _, f := range su.onErr {
go f(err)
}
su.mu.Unlock()
if err == nil {
return
}
su.mu.Lock()
for _, f := range su.onErr {
go f(err)
}
su.mu.Unlock()
}
func (su *Supervisor) getAddress() string {
su.mu.RLock()
addr := su.address
su.mu.RUnlock()
return addr
}
func (su *Supervisor) setAddress(addr string) {
su.mu.Lock()
su.address = addr
su.mu.Unlock()
}
// RegisterOnServe registers a function to call on
@@ -224,27 +219,36 @@ func (su *Supervisor) notifyServe(host TaskHost) {
su.mu.Unlock()
}
// Remove all channels, do it with events
// or with channels but with a different channel on each task proc
// I don't know channels are not so safe, when go func and race risk..
// so better with callbacks....
func (su *Supervisor) supervise(blockFunc func() error) error {
host := createTaskHost(su)
su.notifyServe(host)
atomic.StoreUint32(&su.closedByInterruptHandler, 0)
atomic.StoreUint32(&su.closedManually, 0)
err := blockFunc()
su.notifyErr(err)
if su.nonBlocking {
go func() {
err := blockFunc()
if err != nil {
su.waiter.Fail(err)
}
if su.isWaiting() {
for range su.unblockChan {
break
}
err = su.validateErr(err)
su.notifyErr(err)
}()
return nil
}
return su.validateErr(err)
err := blockFunc()
err = su.validateErr(err)
su.notifyErr(err)
return err
}
// Wait blocks until server is up and running or a serve failure.
func (su *Supervisor) Wait(ctx context.Context) error {
return su.waiter.Wait(ctx)
}
// Serve accepts incoming connections on the Listener l, creating a
@@ -259,7 +263,11 @@ func (su *Supervisor) supervise(blockFunc func() error) error {
// Serve always returns a non-nil error. After Shutdown or Close, the
// returned error is http.ErrServerClosed.
func (su *Supervisor) Serve(l net.Listener) error {
return su.supervise(func() error { return su.Server.Serve(l) })
su.setAddress(l.Addr().String())
return su.supervise(func() error {
return su.Server.Serve(l)
})
}
// ListenAndServe listens on the TCP network address addr
@@ -502,7 +510,6 @@ func (su *Supervisor) RegisterOnShutdown(cb func()) {
// separately notify such long-lived connections of shutdown and wait
// for them to close, if desired.
func (su *Supervisor) Shutdown(ctx context.Context) error {
atomic.StoreUint32(&su.closedManually, 1) // future-use
if ctx == nil {
ctx = context.Background()
}

View File

@@ -4,10 +4,7 @@ package host
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"time"
)
@@ -38,75 +35,3 @@ func ExampleSupervisor_RegisterOnError() {
// http: Server closed
// http: Server closed
}
type myTestTask struct {
restartEvery time.Duration
maxRestarts int
logger *log.Logger
}
func (m myTestTask) OnServe(host TaskHost) {
host.Supervisor.DeferFlow() // don't exit on underline server's Shutdown.
ticker := time.NewTicker(m.restartEvery)
defer ticker.Stop()
rans := 0
for range ticker.C {
exitAfterXRestarts := m.maxRestarts
if rans == exitAfterXRestarts {
m.logger.Println("exit")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
_ = host.Supervisor.Shutdown(ctx) // total shutdown
host.Supervisor.RestoreFlow() // free to exit (if shutdown)
cancel()
return
}
rans++
m.logger.Println(fmt.Sprintf("closed %d times", rans))
host.Shutdown(context.TODO())
startDelay := 2 * time.Second
time.AfterFunc(startDelay, func() {
m.logger.Println("restart")
if err := host.Serve(); err != nil { // restart
panic(err)
}
})
}
}
func ExampleSupervisor_RegisterOnServe() {
h := New(&http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}),
})
logger := log.New(os.Stdout, "Supervisor: ", 0)
mytask := myTestTask{
restartEvery: 3 * time.Second,
maxRestarts: 2,
logger: logger,
}
h.RegisterOnServe(mytask.OnServe)
ln, err := net.Listen("tcp4", ":9394")
if err != nil {
panic(err.Error())
}
logger.Println("server started...")
h.Serve(ln)
// Output:
// Supervisor: server started...
// Supervisor: closed 1 times
// Supervisor: restart
// Supervisor: closed 2 times
// Supervisor: restart
// Supervisor: exit
}

View File

@@ -35,7 +35,7 @@ func newTester(t *testing.T, baseURL string, handler http.Handler) *httpexpect.E
BaseURL: baseURL,
Client: &http.Client{
Transport: transporter,
Jar: httpexpect.NewJar(),
Jar: httpexpect.NewCookieJar(),
},
Reporter: httpexpect.NewAssertReporter(t),
}

View File

@@ -94,10 +94,10 @@ func WriteStartupLogOnServe(w io.Writer) func(TaskHost) {
// This function should be registered on Interrupt.
func ShutdownOnInterrupt(su *Supervisor, shutdownTimeout time.Duration) func() {
return func() {
ctx, cancel := context.WithTimeout(context.TODO(), shutdownTimeout)
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
su.shutdownOnInterrupt(ctx)
su.RestoreFlow()
}
}

134
core/host/waiter.go Normal file
View File

@@ -0,0 +1,134 @@
package host
import (
"context"
"fmt"
"math"
"net"
"sync"
"time"
)
// Waiter is a helper for waiting for a server to be up and running.
type Waiter struct {
defaultMaxRetries int
addressFunc func() string
failure error // or runError for app.Run.
mu sync.RWMutex
}
// NewWaiter returns a new Waiter.
func NewWaiter(defaultMaxRetries int, addressFunc func() string) *Waiter {
if defaultMaxRetries <= 0 {
defaultMaxRetries = 7 // 256 seconds max.
}
return &Waiter{
defaultMaxRetries: defaultMaxRetries,
addressFunc: addressFunc,
}
}
// Wait blocks the main goroutine until the application is up and running.
func (w *Waiter) Wait(ctx context.Context) error {
// First check if there is an error already from Done.
if err := w.getFailure(); err != nil {
return err
}
// Set the base for exponential backoff.
base := 2.0
// Get the maximum number of retries by context or force to default max retries (e.g. 7).
var maxRetries int
// Get the deadline of the context.
if deadline, ok := ctx.Deadline(); ok {
now := time.Now()
timeout := deadline.Sub(now)
maxRetries = getMaxRetries(timeout, base)
} else {
maxRetries = w.defaultMaxRetries
}
// Set the initial retry interval.
retryInterval := time.Second
return w.tryConnect(ctx, w.addressFunc, maxRetries, retryInterval, base)
}
// getMaxRetries calculates the maximum number of retries from the retry interval and the base.
func getMaxRetries(retryInterval time.Duration, base float64) int {
// Convert the retry interval to seconds.
seconds := retryInterval.Seconds()
// Apply the inverse formula.
retries := math.Log(seconds)/math.Log(base) - 1
return int(math.Round(retries))
}
// tryConnect tries to connect to the server with the given context and retry parameters.
func (w *Waiter) tryConnect(ctx context.Context, addressFunc func() string, maxRetries int, retryInterval time.Duration, base float64) error {
// Try to connect to the server in a loop.
for i := 0; i < maxRetries; i++ {
// Check the context before each attempt.
select {
case <-ctx.Done():
// Context is canceled, return the context error.
return ctx.Err()
default:
address := addressFunc() // Get this server's listening address.
if address == "" {
i-- // Note that this may be modified at another go routine of the serve method. So it may be empty at first chance. So retry fetching the VHost every 1 second.
time.Sleep(time.Second)
continue
}
// Context is not canceled, proceed with the attempt.
conn, err := net.Dial("tcp", address)
if err == nil {
// Connection successful, close the connection and return nil.
conn.Close()
return nil // exit.
} // ignore error.
// Connection failed, wait for the retry interval and try again.
time.Sleep(retryInterval)
// After each failed attempt, check the server Run's error again.
if err := w.getFailure(); err != nil {
return err
}
// Increase the retry interval by the base raised to the power of the number of attempts.
/*
0 2 seconds
1 4 seconds
2 8 seconds
3 ~16 seconds
4 ~32 seconds
5 ~64 seconds
6 ~128 seconds
7 ~256 seconds
8 ~512 seconds
...
*/
retryInterval = time.Duration(math.Pow(base, float64(i+1))) * time.Second
}
}
// All attempts failed, return an error.
return fmt.Errorf("failed to connect to the server after %d retries", maxRetries)
}
// Fail is called by the server's Run method when the server failed to start.
func (w *Waiter) Fail(err error) {
w.mu.Lock()
w.failure = err
w.mu.Unlock()
}
func (w *Waiter) getFailure() error {
w.mu.RLock()
err := w.failure
w.mu.RUnlock()
return err
}