mirror of
https://github.com/kataras/iris.git
synced 2026-01-10 05:25:58 +00:00
accesslog: NEW log broker and many more features
some fixes about context clone, fix response recorder concurrent access, fix reload views with only ParseTemplate and more
This commit is contained in:
@@ -18,22 +18,35 @@ func init() {
|
||||
context.SetHandlerName("iris/middleware/accesslog.*", "iris.accesslog")
|
||||
}
|
||||
|
||||
const accessLogFieldsContextKey = "iris.accesslog.request.fields"
|
||||
const (
|
||||
fieldsContextKey = "iris.accesslog.request.fields"
|
||||
skipLogContextKey = "iris.accesslog.request.skip"
|
||||
)
|
||||
|
||||
// GetFields returns the accesslog fields for this request.
|
||||
// Returns a store which the caller can use to
|
||||
// set/get/remove custom log fields. Use its `Set` method.
|
||||
func GetFields(ctx iris.Context) (fields *Fields) {
|
||||
if v := ctx.Values().Get(accessLogFieldsContextKey); v != nil {
|
||||
if v := ctx.Values().Get(fieldsContextKey); v != nil {
|
||||
fields = v.(*Fields)
|
||||
} else {
|
||||
fields = new(Fields)
|
||||
ctx.Values().Set(accessLogFieldsContextKey, fields)
|
||||
ctx.Values().Set(fieldsContextKey, fields)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Skip called when a specific route should be skipped from the logging process.
|
||||
// It's an easy to use alternative for iris.NewConditionalHandler.
|
||||
func Skip(ctx iris.Context) {
|
||||
ctx.Values().Set(skipLogContextKey, struct{}{})
|
||||
}
|
||||
|
||||
func shouldSkip(ctx iris.Context) bool {
|
||||
return ctx.Values().Get(skipLogContextKey) != nil
|
||||
}
|
||||
|
||||
type (
|
||||
|
||||
// Fields is a type alias for memstore.Store, used to set
|
||||
@@ -137,6 +150,7 @@ type AccessLog struct {
|
||||
// order of registration so use a slice and
|
||||
// take the field key from the extractor itself.
|
||||
formatter Formatter
|
||||
broker *Broker
|
||||
}
|
||||
|
||||
// New returns a new AccessLog value with the default values.
|
||||
@@ -179,6 +193,28 @@ func File(path string) *AccessLog {
|
||||
return New(f)
|
||||
}
|
||||
|
||||
// Broker creates or returns the broker.
|
||||
// Use its `NewListener` and `CloseListener`
|
||||
// to listen and unlisten for incoming logs.
|
||||
//
|
||||
// Should be called before serve-time.
|
||||
func (ac *AccessLog) Broker() *Broker {
|
||||
ac.mu.Lock()
|
||||
if ac.broker == nil {
|
||||
ac.broker = newBroker()
|
||||
// atomic.StoreUint32(&ac.brokerActive, 1)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return ac.broker
|
||||
}
|
||||
|
||||
// func (ac *AccessLog) isBrokerActive() bool { // see `Print` method.
|
||||
// return atomic.LoadUint32(&ac.brokerActive) > 0
|
||||
// }
|
||||
// ^ No need, we declare that the Broker should be called
|
||||
// before serve-time. Let's respect our comment
|
||||
// and don't try to make it safe for write and read concurrent access.
|
||||
|
||||
// Write writes to the log destination.
|
||||
// It completes the io.Writer interface.
|
||||
// Safe for concurrent use.
|
||||
@@ -294,6 +330,11 @@ func (ac *AccessLog) shouldReadResponseBody() bool {
|
||||
// defer ac.Close()
|
||||
// app.UseRouter(ac.Handler)
|
||||
func (ac *AccessLog) Handler(ctx *context.Context) {
|
||||
if shouldSkip(ctx) { // usage: another middleware before that one disables logging.
|
||||
ctx.Next()
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
startTime = time.Now()
|
||||
// Store some values, as future handler chain
|
||||
@@ -314,13 +355,17 @@ func (ac *AccessLog) Handler(ctx *context.Context) {
|
||||
|
||||
// Set the fields context value so they can be modified
|
||||
// on the following handlers chain. Same as `AddFields` but per-request.
|
||||
// ctx.Values().Set(accessLogFieldsContextKey, new(Fields))
|
||||
// ctx.Values().Set(fieldsContextKey, new(Fields))
|
||||
// No need ^ The GetFields will set it if it's missing.
|
||||
// So we initialize them whenever, and if, asked.
|
||||
|
||||
// Proceed to the handlers chain.
|
||||
ctx.Next()
|
||||
|
||||
if shouldSkip(ctx) { // normal flow, we can get the context by executing the handler first.
|
||||
return
|
||||
}
|
||||
|
||||
latency := time.Since(startTime)
|
||||
if ac.Async {
|
||||
ctxCopy := ctx.Clone()
|
||||
@@ -435,7 +480,7 @@ func (ac *AccessLog) after(ctx *context.Context, lat time.Duration, method, path
|
||||
|
||||
// Print writes a log manually.
|
||||
// The `Handler` method calls it.
|
||||
func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeFormat string, code int, method, path, reqBody, respBody string, bytesReceived, bytesSent int, params *context.RequestParams, query []memstore.StringEntry, fields []memstore.Entry) error {
|
||||
func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeFormat string, code int, method, path, reqBody, respBody string, bytesReceived, bytesSent int, params *context.RequestParams, query []memstore.StringEntry, fields []memstore.Entry) (err error) {
|
||||
var now time.Time
|
||||
|
||||
if ac.Clock != nil {
|
||||
@@ -444,7 +489,7 @@ func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeForm
|
||||
now = time.Now()
|
||||
}
|
||||
|
||||
if f := ac.formatter; f != nil {
|
||||
if hasFormatter, hasBroker := ac.formatter != nil, ac.broker != nil; hasFormatter || hasBroker {
|
||||
log := &Log{
|
||||
Logger: ac,
|
||||
Now: now,
|
||||
@@ -464,12 +509,21 @@ func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeForm
|
||||
Ctx: ctx, // ctx should only be used here, it may be nil on testing.
|
||||
}
|
||||
|
||||
if err := f.Format(log); err != nil {
|
||||
return err
|
||||
var handled bool
|
||||
if hasFormatter {
|
||||
handled, err = ac.formatter.Format(log)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// OK, it's handled, exit now.
|
||||
return nil
|
||||
if hasBroker { // after Format, it may want to customize the log's fields.
|
||||
ac.broker.notify(log)
|
||||
}
|
||||
|
||||
if handled {
|
||||
return // OK, it's handled, exit now.
|
||||
}
|
||||
}
|
||||
|
||||
// url parameters, path parameters and custom fields separated by space,
|
||||
@@ -478,7 +532,7 @@ func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeForm
|
||||
|
||||
// the number of separators are the same, in order to be easier
|
||||
// for 3rd-party programs to read the result log file.
|
||||
_, err := fmt.Fprintf(ac, "%s|%s|%s|%s|%s|%d|%s|%s|%s|%s|\n",
|
||||
_, err = fmt.Fprintf(ac, "%s|%s|%s|%s|%s|%d|%s|%s|%s|%s|\n",
|
||||
now.Format(timeFormat),
|
||||
latency,
|
||||
method,
|
||||
@@ -491,5 +545,5 @@ func (ac *AccessLog) Print(ctx *context.Context, latency time.Duration, timeForm
|
||||
respBody,
|
||||
)
|
||||
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
90
middleware/accesslog/broker.go
Normal file
90
middleware/accesslog/broker.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package accesslog
|
||||
|
||||
// LogChan describes the log channel.
|
||||
// See `Broker` for details.
|
||||
type LogChan chan *Log
|
||||
|
||||
// A Broker holds the active listeners,
|
||||
// incoming logs on its Notifier channel
|
||||
// and broadcast event data to all registered listeners.
|
||||
//
|
||||
// Exports the `NewListener` and `CloseListener` methods.
|
||||
type Broker struct {
|
||||
// Logs are pushed to this channel
|
||||
// by the main events-gathering `run` routine.
|
||||
Notifier LogChan
|
||||
|
||||
// NewListener action.
|
||||
newListeners chan LogChan
|
||||
|
||||
// CloseListener action.
|
||||
closingListeners chan chan *Log
|
||||
|
||||
// listeners store.
|
||||
listeners map[LogChan]bool
|
||||
}
|
||||
|
||||
// newBroker returns a new broker factory.
|
||||
func newBroker() *Broker {
|
||||
b := &Broker{
|
||||
Notifier: make(LogChan, 1),
|
||||
newListeners: make(chan LogChan),
|
||||
closingListeners: make(chan chan *Log),
|
||||
listeners: make(map[LogChan]bool),
|
||||
}
|
||||
|
||||
// Listens and Broadcasts events.
|
||||
go b.run()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// run listens on different channels and act accordingly.
|
||||
func (b *Broker) run() {
|
||||
for {
|
||||
select {
|
||||
case s := <-b.newListeners:
|
||||
// A new channel has started to listen.
|
||||
b.listeners[s] = true
|
||||
|
||||
case s := <-b.closingListeners:
|
||||
// A listener has dettached.
|
||||
// Stop sending them the logs.
|
||||
delete(b.listeners, s)
|
||||
|
||||
case log := <-b.Notifier:
|
||||
// A new log sent by the logger.
|
||||
// Send it to all active listeners.
|
||||
for clientMessageChan := range b.listeners {
|
||||
clientMessageChan <- log
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notify sends the "log" to all active listeners.
|
||||
func (b *Broker) notify(log *Log) {
|
||||
b.Notifier <- log
|
||||
}
|
||||
|
||||
// NewListener returns a new log channel listener.
|
||||
// The caller SHALL NOT use this to write logs.
|
||||
func (b *Broker) NewListener() LogChan {
|
||||
// Each listener registers its own message channel with the Broker's connections registry.
|
||||
logs := make(LogChan)
|
||||
// Signal the broker that we have a new listener.
|
||||
b.newListeners <- logs
|
||||
return logs
|
||||
}
|
||||
|
||||
// CloseListener removes the "ln" listener from the active listeners.
|
||||
func (b *Broker) CloseListener(ln LogChan) {
|
||||
b.closingListeners <- ln
|
||||
}
|
||||
|
||||
// As we cant export a read-only and pass it as closing client
|
||||
// we will return a read-write channel on NewListener and add a note that the user
|
||||
// should NOT send data back to the channel, its use is read-only.
|
||||
// func (b *Broker) CloseListener(ln <-chan *Log) {
|
||||
// b.closingListeners <- ln
|
||||
// }
|
||||
@@ -34,9 +34,9 @@ type Log struct {
|
||||
// The response status code.
|
||||
Code int `json:"code"`
|
||||
// Sorted URL Query arguments.
|
||||
Query []memstore.StringEntry `json:"query"`
|
||||
Query []memstore.StringEntry `json:"query,omitempty"`
|
||||
// Dynamic path parameters.
|
||||
PathParams []memstore.Entry `json:"params"`
|
||||
PathParams []memstore.Entry `json:"params,omitempty"`
|
||||
// Fields any data information useful to represent this Log.
|
||||
Fields []memstore.Entry `json:"fields,omitempty"`
|
||||
|
||||
@@ -127,15 +127,17 @@ func parseRequestValues(code int, pathParams *context.RequestParams, query []mem
|
||||
|
||||
// Formatter is responsible to print a Log to the accesslog's writer.
|
||||
type Formatter interface {
|
||||
// SetOutput should inject the accesslog's direct output,
|
||||
// if this "dest" is used then the Formatter
|
||||
// should manually control its concurrent use.
|
||||
SetOutput(dest io.Writer)
|
||||
// Format should print the Log.
|
||||
// Returns nil error on handle successfully,
|
||||
// otherwise the log will be printed using the default formatter
|
||||
// and the error will be printed to the Iris Application's error log level.
|
||||
Format(log *Log) error
|
||||
// SetWriter should inject the accesslog's direct output,
|
||||
// if this "dest" is used then the Formatter
|
||||
// should manually control its concurrent use.
|
||||
SetOutput(dest io.Writer)
|
||||
// Should return true if this handled the logging, otherwise false to
|
||||
// continue with the default print format.
|
||||
Format(log *Log) (bool, error)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -169,12 +171,12 @@ func (f *JSON) SetOutput(dest io.Writer) {
|
||||
// Format prints the logs in JSON format.
|
||||
// Writes to the destination directly,
|
||||
// locks on each Format call.
|
||||
func (f *JSON) Format(log *Log) error {
|
||||
func (f *JSON) Format(log *Log) (bool, error) {
|
||||
f.mu.Lock()
|
||||
err := f.enc.Encode(log)
|
||||
f.mu.Unlock()
|
||||
|
||||
return err
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Template is a Formatter.
|
||||
@@ -213,7 +215,7 @@ func (f *Template) SetOutput(dest io.Writer) {
|
||||
const defaultTmplText = "{{.Now.Format .TimeFormat}}|{{.Latency}}|{{.Method}}|{{.Path}}|{{.RequestValuesLine}}|{{.Code}}|{{.BytesReceivedLine}}|{{.BytesSentLine}}|{{.Request}}|{{.Response}}|\n"
|
||||
|
||||
// Format prints the logs in text/template format.
|
||||
func (f *Template) Format(log *Log) error {
|
||||
func (f *Template) Format(log *Log) (bool, error) {
|
||||
var err error
|
||||
|
||||
// A template may be executed safely in parallel, although if parallel
|
||||
@@ -226,5 +228,5 @@ func (f *Template) Format(log *Log) error {
|
||||
}
|
||||
f.mu.Unlock()
|
||||
|
||||
return err
|
||||
return true, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user