1
0
mirror of https://github.com/kataras/iris.git synced 2026-01-09 21:15:56 +00:00

Add notes for the new lead maintainer of the open-source iris project and align with @get-ion/ion by @hiveminded

Former-commit-id: da4f38eb9034daa49446df3ee529423b98f9b331
This commit is contained in:
kataras
2017-07-10 18:32:42 +03:00
parent 2d4c2779a7
commit 9f85b74fc9
344 changed files with 4842 additions and 5174 deletions

View File

@@ -1,7 +1,3 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
@@ -11,7 +7,7 @@ import (
"net/url"
"strings"
"github.com/kataras/iris/core/nettools"
"github.com/kataras/iris/core/netutil"
)
func singleJoiningSlash(a, b string) string {
@@ -52,7 +48,7 @@ func ProxyHandler(target *url.URL) *httputil.ReverseProxy {
}
p := &httputil.ReverseProxy{Director: director}
if nettools.IsLoopbackHost(target.Host) {
if netutil.IsLoopbackHost(target.Host) {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

View File

@@ -1,153 +0,0 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
"sync/atomic"
)
type task struct {
runner TaskRunner
proc TaskProcess
// atomic-accessed, if != 0 means that is already
// canceled before it ever ran, this happens to interrupt handlers too.
alreadyCanceled int32
Cancel func()
}
func (t *task) isCanceled() bool {
return atomic.LoadInt32(&t.alreadyCanceled) != 0
}
// Scheduler is a type of an event emmiter.
// Can register a specific task for a specific event
// when host is starting the server or host is interrupted by CTRL+C/CMD+C.
// It's being used internally on host supervisor.
type Scheduler struct {
onServeTasks []*task
onInterruptTasks []*task
}
// TaskCancelFunc cancels a Task when called.
type TaskCancelFunc func()
// Schedule schedule/registers a Task,
// it will be executed/run to when host starts the server
// or when host is interrupted by CTRL+C/CMD+C based on the TaskRunner type.
//
// See `OnInterrupt` and `ScheduleFunc` too.
func (s *Scheduler) Schedule(runner TaskRunner) TaskCancelFunc {
t := new(task)
t.runner = runner
t.Cancel = func() {
// it's not running yet, so if canceled now
// set to already canceled to not run it at all.
atomic.StoreInt32(&t.alreadyCanceled, 1)
}
if _, ok := runner.(OnInterrupt); ok {
s.onInterruptTasks = append(s.onInterruptTasks, t)
} else {
s.onServeTasks = append(s.onServeTasks, t)
}
return func() {
t.Cancel()
}
}
// ScheduleFunc schedule/registers a task function,
// it will be executed/run to when host starts the server
// or when host is interrupted by CTRL+C/CMD+C based on the TaskRunner type.
//
// See `OnInterrupt` and `ScheduleFunc` too.
func (s *Scheduler) ScheduleFunc(runner func(TaskProcess)) TaskCancelFunc {
return s.Schedule(TaskRunnerFunc(runner))
}
func cancelTasks(tasks []*task) {
for _, t := range tasks {
if atomic.LoadInt32(&t.alreadyCanceled) != 0 {
continue // canceled, don't run it
}
go t.Cancel()
}
}
// CancelOnServeTasks cancels all tasks that are scheduled to run when
// host is starting the server, when the server is alive and online.
func (s *Scheduler) CancelOnServeTasks() {
cancelTasks(s.onServeTasks)
}
// CancelOnInterruptTasks cancels all tasks that are scheduled to run when
// host is being interrupted by CTRL+C/CMD+C, when the server is alive and online as well.
func (s *Scheduler) CancelOnInterruptTasks() {
cancelTasks(s.onInterruptTasks)
}
func runTaskNow(task *task, host TaskHost) {
proc := newTaskProcess(host)
task.proc = proc
task.Cancel = func() {
proc.canceledChan <- struct{}{}
}
go task.runner.Run(proc)
}
func runTasks(tasks []*task, host TaskHost) {
for _, t := range tasks {
if t.isCanceled() {
continue
}
runTaskNow(t, host)
}
}
func (s *Scheduler) runOnServe(host TaskHost) {
runTasks(s.onServeTasks, host)
}
func (s *Scheduler) runOnInterrupt(host TaskHost) {
runTasks(s.onInterruptTasks, host)
}
func (s *Scheduler) visit(visitor func(*task)) {
for _, t := range s.onServeTasks {
visitor(t)
}
for _, t := range s.onInterruptTasks {
visitor(t)
}
}
func (s *Scheduler) notifyShutdown() {
s.visit(func(t *task) {
go func() {
t.proc.Host().doneChan <- struct{}{}
}()
})
}
func (s *Scheduler) notifyErr(err error) {
s.visit(func(t *task) {
go func() {
t.proc.Host().errChan <- err
}()
})
}
// CopyTo copies all tasks from "s" to "to" Scheduler.
// It doesn't care about anything else.
func (s *Scheduler) CopyTo(to *Scheduler) {
s.visit(func(t *task) {
rnner := t.runner
to.Schedule(rnner)
})
}

View File

@@ -1,80 +0,0 @@
// white-box testing
package host
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"time"
)
type myTestTask struct {
delay time.Duration
logger *log.Logger
}
func (m myTestTask) Run(proc TaskProcess) {
ticker := time.NewTicker(m.delay)
defer ticker.Stop()
rans := 0
for {
select {
case _, ok := <-ticker.C:
{
if !ok {
m.logger.Println("ticker issue, closed channel, exiting from this task...")
return
}
rans++
m.logger.Println(fmt.Sprintf("%d", rans))
}
case <-proc.Done():
{
m.logger.Println("canceled, exiting from task AND SHUTDOWN the server...")
proc.Host().Shutdown(context.TODO())
return
}
}
}
}
func SchedulerSchedule() {
h := New(&http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}),
})
logger := log.New(os.Stdout, "Supervisor: ", 0)
delaySeconds := 2
mytask := myTestTask{
delay: time.Duration(delaySeconds) * time.Second,
logger: logger,
}
cancel := h.Schedule(mytask)
ln, err := net.Listen("tcp4", ":9090")
if err != nil {
panic(err.Error())
}
logger.Println("server started...")
logger.Println("we will cancel the task after 2 runs (the third will be canceled)")
cancelAfterRuns := 2
time.AfterFunc(time.Duration(delaySeconds*cancelAfterRuns+(delaySeconds/2))*time.Second, func() {
cancel()
logger.Println("cancel sent")
})
h.Serve(ln)
// Output:
// Supervisor: server started...
// Supervisor: we will cancel the task after 2 runs (the third will be canceled)
// Supervisor: 1
// Supervisor: 2
// Supervisor: cancel sent
// Supervisor: canceled, exiting from task AND SHUTDOWN the server...
}

View File

@@ -1,7 +1,3 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
@@ -9,14 +5,11 @@ import (
"crypto/tls"
"net"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"github.com/kataras/iris/core/errors"
"github.com/kataras/iris/core/nettools"
"github.com/kataras/iris/core/netutil"
"golang.org/x/crypto/acme/autocert"
)
@@ -25,16 +18,17 @@ import (
//
// Interfaces are separated to return relative functionality to them.
type Supervisor struct {
Scheduler
server *http.Server
Server *http.Server
closedManually int32 // future use, accessed atomically (non-zero means we've called the Shutdown)
shouldWait int32 // non-zero means that the host should wait for unblocking
unblockChan chan struct{}
shutdownChan chan struct{}
errChan chan error
manuallyTLS bool // we need that in order to determinate what to output on the console before the server begin.
shouldWait int32 // non-zero means that the host should wait for unblocking
unblockChan chan struct{}
mu sync.Mutex
onServe []func(TaskHost)
onErr []func(error)
onShutdown []func()
}
// New returns a new host supervisor
@@ -46,10 +40,8 @@ type Supervisor struct {
// to return and exit and restore the flow too.
func New(srv *http.Server) *Supervisor {
return &Supervisor{
server: srv,
unblockChan: make(chan struct{}, 1),
shutdownChan: make(chan struct{}),
errChan: make(chan error),
Server: srv,
unblockChan: make(chan struct{}, 1),
}
}
@@ -83,78 +75,73 @@ func (su *Supervisor) isWaiting() bool {
return atomic.LoadInt32(&su.shouldWait) != 0
}
// Done is being received when in server Shutdown.
// This can be used to gracefully shutdown connections that have
// undergone NPN/ALPN protocol upgrade or that have been hijacked.
// This function should start protocol-specific graceful shutdown,
// but should not wait for shutdown to complete.
func (su *Supervisor) Done() <-chan struct{} {
return su.shutdownChan
func (su *Supervisor) newListener() (net.Listener, error) {
// this will not work on "unix" as network
// because UNIX doesn't supports the kind of
// restarts we may want for the server.
//
// User still be able to call .Serve instead.
l, err := netutil.TCPKeepAlive(su.Server.Addr)
if err != nil {
return nil, err
}
// here we can check for sure, without the need of the supervisor's `manuallyTLS` field.
if netutil.IsTLS(su.Server) {
// means tls
tlsl := tls.NewListener(l, su.Server.TLSConfig)
return tlsl, nil
}
return l, nil
}
// Err refences to the return value of Server's .Serve, not the server's specific error logger.
func (su *Supervisor) Err() <-chan error {
return su.errChan
}
func (su *Supervisor) notifyShutdown() {
go func() {
su.shutdownChan <- struct{}{}
}()
su.Scheduler.notifyShutdown()
// RegisterOnError registers a function to call when errors occured by the underline http server.
func (su *Supervisor) RegisterOnError(cb func(error)) {
su.mu.Lock()
su.onErr = append(su.onErr, cb)
su.mu.Unlock()
}
func (su *Supervisor) notifyErr(err error) {
// if err == http.ErrServerClosed {
// su.notifyShutdown()
// return
// }
go func() {
su.errChan <- err
}()
su.Scheduler.notifyErr(err)
su.mu.Lock()
for _, f := range su.onErr {
go f(err)
}
su.mu.Unlock()
}
// RegisterOnServe registers a function to call on
// Serve/ListenAndServe/ListenAndServeTLS/ListenAndServeAutoTLS.
func (su *Supervisor) RegisterOnServe(cb func(TaskHost)) {
su.mu.Lock()
su.onServe = append(su.onServe, cb)
su.mu.Unlock()
}
func (su *Supervisor) notifyServe(host TaskHost) {
su.mu.Lock()
for _, f := range su.onServe {
go f(host)
}
su.mu.Unlock()
}
/// TODO:
// Remove all channels, do it with events
// or with channels but with a different channel on each task proc
// I don't know channels are not so safe, when go func and race risk..
// so better with callbacks....
func (su *Supervisor) supervise(blockFunc func() error) error {
// println("Running Serve from Supervisor")
// su.server: in order to Serve and Shutdown the underline server and no re-run the supervisors when .Shutdown -> .Serve.
// su.GetBlocker: set the Block() and Unblock(), which are checked after a shutdown or error.
// su.GetNotifier: only one supervisor is allowed to be notified about Close/Shutdown and Err.
// su.log: set this builder's logger in order to supervisor to be able to share a common logger.
host := createTaskHost(su)
// run the list of supervisors in different go-tasks by-design.
su.Scheduler.runOnServe(host)
if len(su.Scheduler.onInterruptTasks) > 0 {
// this can't be moved to the task interrupt's `Run` function
// because it will not catch more than one ctrl/cmd+c, so
// we do it here. These tasks are canceled already too.
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch,
// kill -SIGINT XXXX or Ctrl+c
os.Interrupt,
syscall.SIGINT, // register that too, it should be ok
// os.Kill is equivalent with the syscall.SIGKILL
os.Kill,
syscall.SIGKILL, // register that too, it should be ok
// kill -SIGTERM XXXX
syscall.SIGTERM,
)
select {
case <-ch:
su.Scheduler.runOnInterrupt(host)
}
}()
}
su.notifyServe(host)
tryStartInterruptNotifier()
err := blockFunc()
su.notifyErr(err)
@@ -172,26 +159,6 @@ func (su *Supervisor) supervise(blockFunc func() error) error {
return err // start the server
}
func (su *Supervisor) newListener() (net.Listener, error) {
// this will not work on "unix" as network
// because UNIX doesn't supports the kind of
// restarts we may want for the server.
//
// User still be able to call .Serve instead.
l, err := nettools.TCPKeepAlive(su.server.Addr)
if err != nil {
return nil, err
}
if nettools.IsTLS(su.server) {
// means tls
tlsl := tls.NewListener(l, su.server.TLSConfig)
return tlsl, nil
}
return l, nil
}
// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each. The service goroutines read requests and
// then call su.server.Handler to reply to them.
@@ -204,7 +171,8 @@ func (su *Supervisor) newListener() (net.Listener, error) {
// Serve always returns a non-nil error. After Shutdown or Close, the
// returned error is http.ErrServerClosed.
func (su *Supervisor) Serve(l net.Listener) error {
return su.supervise(func() error { return su.server.Serve(l) })
return su.supervise(func() error { return su.Server.Serve(l) })
}
// ListenAndServe listens on the TCP network address addr
@@ -240,8 +208,8 @@ func (su *Supervisor) ListenAndServeTLS(certFile string, keyFile string) error {
}
setupHTTP2(cfg)
su.server.TLSConfig = cfg
su.Server.TLSConfig = cfg
su.manuallyTLS = true
return su.ListenAndServe()
}
@@ -256,10 +224,33 @@ func (su *Supervisor) ListenAndServeAutoTLS() error {
cfg := new(tls.Config)
cfg.GetCertificate = autoTLSManager.GetCertificate
setupHTTP2(cfg)
su.server.TLSConfig = cfg
su.Server.TLSConfig = cfg
su.manuallyTLS = true
return su.ListenAndServe()
}
// RegisterOnShutdown registers a function to call on Shutdown.
// This can be used to gracefully shutdown connections that have
// undergone NPN/ALPN protocol upgrade or that have been hijacked.
// This function should start protocol-specific graceful shutdown,
// but should not wait for shutdown to complete.
func (su *Supervisor) RegisterOnShutdown(cb func()) {
// when go1.9: replace the following lines with su.Server.RegisterOnShutdown(f)
su.mu.Lock()
su.onShutdown = append(su.onShutdown, cb)
su.mu.Unlock()
}
func (su *Supervisor) notifyShutdown() {
// when go1.9: remove the lines below
su.mu.Lock()
for _, f := range su.onShutdown {
go f()
}
su.mu.Unlock()
// end
}
// Shutdown gracefully shuts down the server without interrupting any
// active connections. Shutdown works by first closing all open
// listeners, then closing all idle connections, and then waiting
@@ -272,9 +263,7 @@ func (su *Supervisor) ListenAndServeAutoTLS() error {
// separately notify such long-lived connections of shutdown and wait
// for them to close, if desired.
func (su *Supervisor) Shutdown(ctx context.Context) error {
// println("Running Shutdown from Supervisor")
atomic.AddInt32(&su.closedManually, 1) // future-use
su.notifyShutdown()
return su.server.Shutdown(ctx)
return su.Server.Shutdown(ctx)
}

View File

@@ -0,0 +1,118 @@
// white-box testing
package host
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"time"
)
func ExampleSupervisor_RegisterOnError() {
su := New(&http.Server{Addr: ":8273", Handler: http.DefaultServeMux})
su.RegisterOnError(func(err error) {
fmt.Println(err.Error())
})
su.RegisterOnError(func(err error) {
fmt.Println(err.Error())
})
su.RegisterOnError(func(err error) {
fmt.Println(err.Error())
})
go su.ListenAndServe()
time.Sleep(1 * time.Second)
su.Shutdown(context.TODO())
time.Sleep(1 * time.Second)
// Output:
// http: Server closed
// http: Server closed
// http: Server closed
}
type myTestTask struct {
restartEvery time.Duration
maxRestarts int
logger *log.Logger
}
func (m myTestTask) OnServe(host TaskHost) {
host.Supervisor.DeferFlow() // don't exit on underline server's Shutdown.
ticker := time.NewTicker(m.restartEvery)
defer ticker.Stop()
rans := 0
for {
select {
case _, ok := <-ticker.C:
{
if !ok {
m.logger.Println("ticker issue, closed channel, exiting from this task...")
return
}
exitAfterXRestarts := m.maxRestarts
if rans == exitAfterXRestarts {
m.logger.Println("exit")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
host.Supervisor.Shutdown(ctx) // total shutdown
host.Supervisor.RestoreFlow() // free to exit (if shutdown)
return
}
rans++
m.logger.Println(fmt.Sprintf("closed %d times", rans))
host.Shutdown(context.TODO())
startDelay := 2 * time.Second
time.AfterFunc(startDelay, func() {
m.logger.Println("restart")
host.Serve() // restart
})
}
}
}
}
func ExampleSupervisor_RegisterOnServe() {
h := New(&http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}),
})
logger := log.New(os.Stdout, "Supervisor: ", 0)
mytask := myTestTask{
restartEvery: 6 * time.Second,
maxRestarts: 2,
logger: logger,
}
h.RegisterOnServe(mytask.OnServe)
ln, err := net.Listen("tcp4", ":9394")
if err != nil {
panic(err.Error())
}
logger.Println("server started...")
h.Serve(ln)
// Output:
// Supervisor: server started...
// Supervisor: closed 1 times
// Supervisor: restart
// Supervisor: closed 2 times
// Supervisor: restart
// Supervisor: exit
}

View File

@@ -49,7 +49,7 @@ func newTester(t *testing.T, baseURL string, handler http.Handler) *httpexpect.E
return httpexpect.WithConfig(testConfiguration)
}
func testSupervisor(t *testing.T, creator func(*http.Server, []TaskRunner) *Supervisor) {
func testSupervisor(t *testing.T, creator func(*http.Server, []func(TaskHost)) *Supervisor) {
loggerOutput := &bytes.Buffer{}
logger := log.New(loggerOutput, "", 0)
const (
@@ -76,11 +76,11 @@ func testSupervisor(t *testing.T, creator func(*http.Server, []TaskRunner) *Supe
t.Fatal(err)
}
helloMe := TaskRunnerFunc(func(proc TaskProcess) {
helloMe := func(_ TaskHost) {
logger.Print(expectedHelloMessage)
})
}
host := creator(srv, []TaskRunner{helloMe})
host := creator(srv, []func(TaskHost){helloMe})
defer host.Shutdown(context.TODO())
go host.Serve(ln)
@@ -99,10 +99,10 @@ func testSupervisor(t *testing.T, creator func(*http.Server, []TaskRunner) *Supe
}
}
func TestSupervisor(t *testing.T) {
testSupervisor(t, func(srv *http.Server, tasks []TaskRunner) *Supervisor {
testSupervisor(t, func(srv *http.Server, tasks []func(TaskHost)) *Supervisor {
su := New(srv)
for _, t := range tasks {
su.Schedule(t)
su.RegisterOnServe(t)
}
return su

View File

@@ -1,7 +1,3 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
// the 24hour name was "Supervisor" but it's not cover its usage
@@ -10,57 +6,61 @@ package host
// supervisor.
import (
"context"
"github.com/kataras/iris/core/nettools"
"fmt"
"io"
"net/http"
"runtime"
"time"
"github.com/kataras/iris/core/netutil"
)
type (
// FlowController exports the `DeferFlow`
// and `RestoreFlow` capabilities.
// Read more at Supervisor.
FlowController interface {
DeferFlow()
RestoreFlow()
// WriteStartupLogOnServe is a task which accepts a logger(io.Writer)
// and logs the listening address
// by a generated message based on the host supervisor's server and writes it to the "w".
// This function should be registered on Serve.
func WriteStartupLogOnServe(w io.Writer) func(TaskHost) {
return func(h TaskHost) {
guessScheme := netutil.ResolveScheme(h.Supervisor.manuallyTLS)
listeningURI := netutil.ResolveURL(guessScheme, h.Supervisor.Server.Addr)
interruptkey := "CTRL"
if runtime.GOOS == "darwin" {
interruptkey = "CMD"
}
w.Write([]byte(fmt.Sprintf("Now listening on: %s\nApplication started. Press %s+C to shut down.\n",
listeningURI, interruptkey)))
}
)
}
// ShutdownOnInterrupt terminates the supervisor and its underline server when CMD+C/CTRL+C pressed.
// This function should be registerd on Interrupt.
func ShutdownOnInterrupt(su *Supervisor, shutdownTimeout time.Duration) func() {
return func() {
ctx, cancel := context.WithTimeout(context.TODO(), shutdownTimeout)
defer cancel()
su.Shutdown(ctx)
su.RestoreFlow()
}
}
// TaskHost contains all the necessary information
// about the host supervisor, its server
// and the exports the whole flow controller of it.
type TaskHost struct {
su *Supervisor
// Supervisor with access fields when server is running, i.e restrict access to "Schedule"
// Server that running, is active and open
// Flow controller
FlowController
// Various
pid int
doneChan chan struct{}
errChan chan error
}
// Done filled when server was shutdown.
func (h TaskHost) Done() <-chan struct{} {
return h.doneChan
}
// Err filled when server received an error.
func (h TaskHost) Err() <-chan error {
return h.errChan
Supervisor *Supervisor
}
// Serve can (re)run the server with the latest known configuration.
func (h TaskHost) Serve() error {
// the underline server's serve, using the "latest known" listener from the supervisor.
l, err := h.su.newListener()
l, err := h.Supervisor.newListener()
if err != nil {
return err
}
// if http.serverclosed ignroe the error, it will have this error
// from the previous close
if err := h.su.server.Serve(l); err != http.ErrServerClosed {
if err := h.Supervisor.Server.Serve(l); err != http.ErrServerClosed {
return err
}
return nil
@@ -69,12 +69,12 @@ func (h TaskHost) Serve() error {
// HostURL returns the listening full url (scheme+host)
// based on the supervisor's server's address.
func (h TaskHost) HostURL() string {
return nettools.ResolveURLFromServer(h.su.server)
return netutil.ResolveURLFromServer(h.Supervisor.Server)
}
// Hostname returns the underline server's hostname.
func (h TaskHost) Hostname() string {
return nettools.ResolveHostname(h.su.server.Addr)
return netutil.ResolveHostname(h.Supervisor.Server.Addr)
}
// Shutdown gracefully shuts down the server without interrupting any
@@ -88,77 +88,16 @@ func (h TaskHost) Hostname() string {
// connections such as WebSockets. The caller of Shutdown should
// separately notify such long-lived connections of shutdown and wait
// for them to close, if desired.
//
// This Shutdown calls the underline's Server's Shutdown, in order to be able to re-start the server
// from a task.
func (h TaskHost) Shutdown(ctx context.Context) error {
// the underline server's Shutdown (otherwise we will cancel all tasks and do cycles)
return h.su.server.Shutdown(ctx)
}
// TaskProcess is the context of the Task runner.
// Contains the host's information and actions
// and its self cancelation emmiter.
type TaskProcess struct {
canceledChan chan struct{}
host TaskHost
}
// Done filled when this task is canceled.
func (p TaskProcess) Done() <-chan struct{} {
return p.canceledChan
}
// Host returns the TaskHost.
//
// TaskHost contains all the necessary information
// about the host supervisor, its server
// and the exports the whole flow controller of it.
func (p TaskProcess) Host() TaskHost {
return p.host
return h.Supervisor.Server.Shutdown(ctx)
}
func createTaskHost(su *Supervisor) TaskHost {
host := TaskHost{
su: su,
FlowController: su,
doneChan: make(chan struct{}),
errChan: make(chan error),
}
return host
}
func newTaskProcess(host TaskHost) TaskProcess {
return TaskProcess{
host: host,
canceledChan: make(chan struct{}),
return TaskHost{
Supervisor: su,
}
}
// A TaskRunner is an independent stream of instructions in a Supervisor.
// A routine is similar to a sequential program.
// However, a routine itself is not a program,
// it can't run on its own, instead it runs within a Supervisor's context.
//
// The real usage of a routine is not about a single sequential thread,
// but rather using multiple tasks in a single Supervisor.
// Multiple tasks running at the same time and performing various tasks is referred as Multithreading.
// A Task is considered to be a lightweight process because it runs within the context of a Supervisor
// and takes advantage of resources allocated for that Supervisor and its Server.
type TaskRunner interface {
// Run runs the task based on its TaskProcess which contains
// all the necessary information and actions to control the host supervisor
// and its server.
Run(TaskProcess)
}
// TaskRunnerFunc "converts" a func(TaskProcess) to a complete TaskRunner.
// Its functionality is exactly the same as TaskRunner.
//
// See `TaskRunner` too.
type TaskRunnerFunc func(TaskProcess)
// Run runs the task based on its TaskProcess which contains
// all the necessary information and actions to control the host supervisor
// and its server.
func (s TaskRunnerFunc) Run(proc TaskProcess) {
s(proc)
}

View File

@@ -1,27 +0,0 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
"fmt"
"io"
"runtime"
)
// WriteBannerTask is a task which accepts a logger(io.Writer)
// and a "banner" text to write to following
// by a generated message based on the host supervisor's server and writes it to the "w".
// This task runs on serve.
func WriteBannerTask(w io.Writer, banner string) TaskRunnerFunc {
return func(proc TaskProcess) {
listeningURI := proc.Host().HostURL()
interruptkey := "CTRL"
if runtime.GOOS == "darwin" {
interruptkey = "CMD"
}
w.Write([]byte(fmt.Sprintf("%s\n\nNow listening on: %s\nApplication started. Press %s+C to shut down.\n",
banner, listeningURI, interruptkey)))
}
}

View File

@@ -1,44 +0,0 @@
// white-box testing
package host
import (
"context"
"fmt"
"net/http"
"time"
)
func TaskHostError() {
su := New(&http.Server{Addr: ":8273", Handler: http.DefaultServeMux})
su.ScheduleFunc(func(proc TaskProcess) {
select {
case err := <-proc.Host().Err():
fmt.Println(err.Error())
}
})
su.ScheduleFunc(func(proc TaskProcess) {
select {
case err := <-proc.Host().Err():
fmt.Println(err.Error())
}
})
su.ScheduleFunc(func(proc TaskProcess) {
select {
case err := <-proc.Host().Err():
fmt.Println(err.Error())
}
})
go su.ListenAndServe()
time.Sleep(1 * time.Second)
su.Shutdown(context.TODO())
time.Sleep(1 * time.Second)
// Output:
// http: Server closed
// http: Server closed
// http: Server closed
}

View File

@@ -1,30 +0,0 @@
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
"context"
"time"
)
// OnInterrupt is a built'n supervisor task type which fires its
// value(Task) when an OS interrupt/kill signal received.
type OnInterrupt TaskRunnerFunc
// Run runs the interrupt task and completes the TaskRunner interface.
func (t OnInterrupt) Run(proc TaskProcess) {
t(proc)
}
// ShutdownOnInterruptTask returns a supervisor's built'n task which
// shutdowns the server when InterruptSignalTask fire this task.
func ShutdownOnInterruptTask(shutdownTimeout time.Duration) TaskRunner {
return OnInterrupt(func(proc TaskProcess) {
ctx, cancel := context.WithTimeout(context.TODO(), shutdownTimeout)
defer cancel()
proc.Host().Shutdown(ctx)
proc.Host().RestoreFlow()
})
}

61
core/host/world.go Normal file
View File

@@ -0,0 +1,61 @@
package host
import (
"os"
"os/signal"
"sync"
"syscall"
)
// package-level interrupt notifier and event firing.
type world struct {
mu sync.Mutex
// onInterrupt contains a list of the functions that should be called when CTRL+C/CMD+C or
// a unix kill command received.
onInterrupt []func()
}
var w = &world{}
// RegisterOnInterrupt registers a global function to call when CTRL+C/CMD+C pressed or a unix kill command received.
func RegisterOnInterrupt(cb func()) {
w.mu.Lock()
w.onInterrupt = append(w.onInterrupt, cb)
w.mu.Unlock()
}
func notifyInterrupt() {
w.mu.Lock()
for _, f := range w.onInterrupt {
go f()
}
w.mu.Unlock()
}
func tryStartInterruptNotifier() {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onInterrupt) > 0 {
// this can't be moved to the task interrupt's `Run` function
// because it will not catch more than one ctrl/cmd+c, so
// we do it here. These tasks are canceled already too.
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch,
// kill -SIGINT XXXX or Ctrl+c
os.Interrupt,
syscall.SIGINT, // register that too, it should be ok
// os.Kill is equivalent with the syscall.SIGKILL
os.Kill,
syscall.SIGKILL, // register that too, it should be ok
// kill -SIGTERM XXXX
syscall.SIGTERM,
)
select {
case <-ch:
notifyInterrupt()
}
}()
}
}