mirror of
https://github.com/kataras/iris.git
synced 2026-01-22 11:25:59 +00:00
Sessions are now in full sync with the registered database, on acquire(init), set, get, delete, clear, visit, len, release(destroy) as requested by almost everyone. https://github.com/kataras/iris/issues/969
Former-commit-id: 49fcdb93106a78f0a24ad3fb4d8725e35e98451a
This commit is contained in:
@@ -3,6 +3,7 @@ package badger
|
||||
import (
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/kataras/golog"
|
||||
"github.com/kataras/iris/core/errors"
|
||||
@@ -25,6 +26,8 @@ type Database struct {
|
||||
Service *badger.DB
|
||||
}
|
||||
|
||||
var _ sessions.Database = (*Database)(nil)
|
||||
|
||||
// New creates and returns a new badger(key-value file-based) storage
|
||||
// instance based on the "directoryPath".
|
||||
// DirectoryPath should is the directory which the badger database will store the sessions,
|
||||
@@ -32,9 +35,8 @@ type Database struct {
|
||||
//
|
||||
// It will remove any old session files.
|
||||
func New(directoryPath string) (*Database, error) {
|
||||
|
||||
if directoryPath == "" {
|
||||
return nil, errors.New("dir is missing")
|
||||
return nil, errors.New("directoryPath is missing")
|
||||
}
|
||||
|
||||
lindex := directoryPath[len(directoryPath)-1]
|
||||
@@ -57,134 +59,180 @@ func New(directoryPath string) (*Database, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewFromDB(service)
|
||||
return NewFromDB(service), nil
|
||||
}
|
||||
|
||||
// NewFromDB same as `New` but accepts an already-created custom badger connection instead.
|
||||
func NewFromDB(service *badger.DB) (*Database, error) {
|
||||
if service == nil {
|
||||
return nil, errors.New("underline database is missing")
|
||||
}
|
||||
|
||||
func NewFromDB(service *badger.DB) *Database {
|
||||
db := &Database{Service: service}
|
||||
|
||||
runtime.SetFinalizer(db, closeDB)
|
||||
return db, db.Cleanup()
|
||||
}
|
||||
|
||||
// Cleanup removes any invalid(have expired) session entries,
|
||||
// it's being called automatically on `New` as well.
|
||||
func (db *Database) Cleanup() (err error) {
|
||||
rep := errors.NewReporter()
|
||||
|
||||
txn := db.Service.NewTransaction(true)
|
||||
defer txn.Commit(nil)
|
||||
|
||||
iter := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer iter.Close()
|
||||
|
||||
for iter.Rewind(); iter.Valid(); iter.Next() {
|
||||
// Remember that the contents of the returned slice should not be modified, and
|
||||
// only valid until the next call to Next.
|
||||
item := iter.Item()
|
||||
b, err := item.Value()
|
||||
|
||||
if rep.AddErr(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
storeDB, err := sessions.DecodeRemoteStore(b)
|
||||
if rep.AddErr(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
if storeDB.Lifetime.HasExpired() {
|
||||
if err := txn.Delete(item.Key()); err != nil {
|
||||
rep.AddErr(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rep.Return()
|
||||
}
|
||||
|
||||
// Async is DEPRECATED
|
||||
// if it was true then it could use different to update the back-end storage, now it does nothing.
|
||||
func (db *Database) Async(useGoRoutines bool) *Database {
|
||||
return db
|
||||
}
|
||||
|
||||
// Load loads the sessions from the badger(key-value file-based) session storage.
|
||||
func (db *Database) Load(sid string) (storeDB sessions.RemoteStore) {
|
||||
// Acquire receives a session's lifetime from the database,
|
||||
// if the return value is LifeTime{} then the session manager sets the life time based on the expiration duration lives in configuration.
|
||||
func (db *Database) Acquire(sid string, expires time.Duration) sessions.LifeTime {
|
||||
txn := db.Service.NewTransaction(true)
|
||||
defer txn.Commit(nil)
|
||||
|
||||
bsid := []byte(sid)
|
||||
|
||||
txn := db.Service.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
item, err := txn.Get(bsid)
|
||||
if err == nil {
|
||||
// found, return the expiration.
|
||||
return sessions.LifeTime{Time: time.Unix(int64(item.ExpiresAt()), 0)}
|
||||
}
|
||||
|
||||
// not found, create an entry with ttl and return an empty lifetime, session manager will do its job.
|
||||
if err != nil {
|
||||
// Key not found, don't report this, session manager will create a new session as it should.
|
||||
if err == badger.ErrKeyNotFound {
|
||||
// create it and set the expiration, we don't care about the value there.
|
||||
err = txn.SetWithTTL(bsid, bsid, expires)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
}
|
||||
|
||||
return sessions.LifeTime{} // session manager will handle the rest.
|
||||
}
|
||||
|
||||
var delim = byte('*')
|
||||
|
||||
func makeKey(sid, key string) []byte {
|
||||
return append([]byte(sid), append([]byte(key), delim)...)
|
||||
}
|
||||
|
||||
// Set sets a key value of a specific session.
|
||||
// Ignore the "immutable".
|
||||
func (db *Database) Set(sid string, lifetime sessions.LifeTime, key string, value interface{}, immutable bool) {
|
||||
valueBytes, err := sessions.DefaultTranscoder.Marshal(value)
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := item.Value()
|
||||
err = db.Service.Update(func(txn *badger.Txn) error {
|
||||
return txn.SetWithTTL(makeKey(sid, key), valueBytes, lifetime.DurationUntilExpiration())
|
||||
// return txn.Set(makeKey(sid, key), valueBytes)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
golog.Errorf("error while trying to get the serialized session(%s) from the remote store: %v", sid, err)
|
||||
return
|
||||
golog.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
storeDB, err = sessions.DecodeRemoteStore(b) // decode the whole value, as a remote store
|
||||
if err != nil {
|
||||
golog.Errorf("error while trying to load from the remote store: %v", err)
|
||||
// Get retrieves a session value based on the key.
|
||||
func (db *Database) Get(sid string, key string) (value interface{}) {
|
||||
err := db.Service.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get(makeKey(sid, key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// item.ValueCopy
|
||||
valueBytes, err := item.Value()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sessions.DefaultTranscoder.Unmarshal(valueBytes, &value)
|
||||
})
|
||||
|
||||
if err != nil && err != badger.ErrKeyNotFound {
|
||||
golog.Error(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Sync syncs the database with the session's (memory) store.
|
||||
func (db *Database) Sync(p sessions.SyncPayload) {
|
||||
db.sync(p)
|
||||
}
|
||||
// Visit loops through all session keys and values.
|
||||
func (db *Database) Visit(sid string, cb func(key string, value interface{})) {
|
||||
prefix := append([]byte(sid), delim)
|
||||
|
||||
func (db *Database) sync(p sessions.SyncPayload) {
|
||||
bsid := []byte(p.SessionID)
|
||||
txn := db.Service.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
if p.Action == sessions.ActionDestroy {
|
||||
if err := db.destroy(bsid); err != nil {
|
||||
golog.Errorf("error while destroying a session(%s) from badger: %v",
|
||||
p.SessionID, err)
|
||||
iter := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||
defer iter.Close()
|
||||
|
||||
for iter.Rewind(); iter.ValidForPrefix(prefix); iter.Next() {
|
||||
item := iter.Item()
|
||||
valueBytes, err := item.Value()
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s, err := p.Store.Serialize()
|
||||
if err != nil {
|
||||
golog.Errorf("error while serializing the remote store: %v", err)
|
||||
}
|
||||
var value interface{}
|
||||
if err = sessions.DefaultTranscoder.Unmarshal(valueBytes, &value); err != nil {
|
||||
golog.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
txn := db.Service.NewTransaction(true)
|
||||
|
||||
err = txn.Set(bsid, s)
|
||||
if err != nil {
|
||||
txn.Discard()
|
||||
golog.Errorf("error while trying to save the session(%s) to the database: %v", p.SessionID, err)
|
||||
return
|
||||
}
|
||||
if err := txn.Commit(nil); err != nil { // Commit will call the Discard automatically.
|
||||
golog.Errorf("error while committing the session(%s) changes to the database: %v", p.SessionID, err)
|
||||
cb(string(item.Key()), value)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Database) destroy(bsid []byte) error {
|
||||
txn := db.Service.NewTransaction(true)
|
||||
var iterOptionsNoValues = badger.IteratorOptions{
|
||||
PrefetchValues: false,
|
||||
PrefetchSize: 100,
|
||||
Reverse: false,
|
||||
AllVersions: false,
|
||||
}
|
||||
|
||||
err := txn.Delete(bsid)
|
||||
if err != nil {
|
||||
return err
|
||||
// Len returns the length of the session's entries (keys).
|
||||
func (db *Database) Len(sid string) (n int) {
|
||||
prefix := append([]byte(sid), delim)
|
||||
|
||||
txn := db.Service.NewTransaction(false)
|
||||
iter := txn.NewIterator(iterOptionsNoValues)
|
||||
|
||||
for iter.Rewind(); iter.ValidForPrefix(prefix); iter.Next() {
|
||||
n++
|
||||
}
|
||||
|
||||
return txn.Commit(nil)
|
||||
iter.Close()
|
||||
txn.Discard()
|
||||
return
|
||||
}
|
||||
|
||||
// Delete removes a session key value based on its key.
|
||||
func (db *Database) Delete(sid string, key string) (deleted bool) {
|
||||
txn := db.Service.NewTransaction(true)
|
||||
err := txn.Delete(makeKey(sid, key))
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
}
|
||||
txn.Commit(nil)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Clear removes all session key values but it keeps the session entry.
|
||||
func (db *Database) Clear(sid string) {
|
||||
prefix := append([]byte(sid), delim)
|
||||
|
||||
txn := db.Service.NewTransaction(true)
|
||||
defer txn.Commit(nil)
|
||||
|
||||
iter := txn.NewIterator(iterOptionsNoValues)
|
||||
defer iter.Close()
|
||||
|
||||
for iter.Rewind(); iter.ValidForPrefix(prefix); iter.Next() {
|
||||
txn.Delete(iter.Item().Key())
|
||||
}
|
||||
}
|
||||
|
||||
// Release destroys the session, it clears and removes the session entry,
|
||||
// session manager will create a new session ID on the next request after this call.
|
||||
func (db *Database) Release(sid string) {
|
||||
// clear all $sid-$key.
|
||||
db.Clear(sid)
|
||||
// and remove the $sid.
|
||||
txn := db.Service.NewTransaction(true)
|
||||
txn.Delete([]byte(sid))
|
||||
txn.Commit(nil)
|
||||
}
|
||||
|
||||
// Close shutdowns the badger connection.
|
||||
|
||||
@@ -1,230 +0,0 @@
|
||||
package boltdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/kataras/golog"
|
||||
"github.com/kataras/iris/core/errors"
|
||||
"github.com/kataras/iris/sessions"
|
||||
)
|
||||
|
||||
// DefaultFileMode used as the default database's "fileMode"
|
||||
// for creating the sessions directory path, opening and write
|
||||
// the session boltdb(file-based) storage.
|
||||
var (
|
||||
DefaultFileMode = 0755
|
||||
)
|
||||
|
||||
// Database the BoltDB(file-based) session storage.
|
||||
type Database struct {
|
||||
table []byte
|
||||
// Service is the underline BoltDB database connection,
|
||||
// it's initialized at `New` or `NewFromDB`.
|
||||
// Can be used to get stats.
|
||||
Service *bolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrOptionsMissing returned on `New` when path or tableName are empty.
|
||||
ErrOptionsMissing = errors.New("required options are missing")
|
||||
)
|
||||
|
||||
// New creates and returns a new BoltDB(file-based) storage
|
||||
// instance based on the "path".
|
||||
// Path should include the filename and the directory(aka fullpath), i.e sessions/store.db.
|
||||
//
|
||||
// It will remove any old session files.
|
||||
func New(path string, fileMode os.FileMode, bucketName string) (*Database, error) {
|
||||
|
||||
if path == "" || bucketName == "" {
|
||||
return nil, ErrOptionsMissing
|
||||
}
|
||||
|
||||
if fileMode <= 0 {
|
||||
fileMode = os.FileMode(DefaultFileMode)
|
||||
}
|
||||
|
||||
// create directories if necessary
|
||||
if err := os.MkdirAll(filepath.Dir(path), fileMode); err != nil {
|
||||
golog.Errorf("error while trying to create the necessary directories for %s: %v", path, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service, err := bolt.Open(path, 0600,
|
||||
&bolt.Options{Timeout: 15 * time.Second},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
golog.Errorf("unable to initialize the BoltDB-based session database: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewFromDB(service, bucketName)
|
||||
}
|
||||
|
||||
// NewFromDB same as `New` but accepts an already-created custom boltdb connection instead.
|
||||
func NewFromDB(service *bolt.DB, bucketName string) (*Database, error) {
|
||||
if bucketName == "" {
|
||||
return nil, ErrOptionsMissing
|
||||
}
|
||||
bucket := []byte(bucketName)
|
||||
|
||||
service.Update(func(tx *bolt.Tx) (err error) {
|
||||
_, err = tx.CreateBucketIfNotExists(bucket)
|
||||
return
|
||||
})
|
||||
|
||||
db := &Database{table: bucket, Service: service}
|
||||
|
||||
runtime.SetFinalizer(db, closeDB)
|
||||
return db, db.Cleanup()
|
||||
}
|
||||
|
||||
// Cleanup removes any invalid(have expired) session entries,
|
||||
// it's being called automatically on `New` as well.
|
||||
func (db *Database) Cleanup() error {
|
||||
err := db.Service.Update(func(tx *bolt.Tx) error {
|
||||
b := db.getBucket(tx)
|
||||
c := b.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if len(k) == 0 { // empty key, continue to the next pair
|
||||
continue
|
||||
}
|
||||
|
||||
storeDB, err := sessions.DecodeRemoteStore(v)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if storeDB.Lifetime.HasExpired() {
|
||||
if err := c.Delete(); err != nil {
|
||||
golog.Warnf("troubles when cleanup a session remote store from BoltDB: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Async is DEPRECATED
|
||||
// if it was true then it could use different to update the back-end storage, now it does nothing.
|
||||
func (db *Database) Async(useGoRoutines bool) *Database {
|
||||
return db
|
||||
}
|
||||
|
||||
// Load loads the sessions from the BoltDB(file-based) session storage.
|
||||
func (db *Database) Load(sid string) (storeDB sessions.RemoteStore) {
|
||||
bsid := []byte(sid)
|
||||
err := db.Service.View(func(tx *bolt.Tx) (err error) {
|
||||
// db.getSessBucket(tx, sid)
|
||||
b := db.getBucket(tx)
|
||||
c := b.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if len(k) == 0 { // empty key, continue to the next pair
|
||||
continue
|
||||
}
|
||||
|
||||
if bytes.Equal(k, bsid) { // session id should be the name of the key-value pair
|
||||
storeDB, err = sessions.DecodeRemoteStore(v) // decode the whole value, as a remote store
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
golog.Errorf("error while trying to load from the remote store: %v", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Sync syncs the database with the session's (memory) store.
|
||||
func (db *Database) Sync(p sessions.SyncPayload) {
|
||||
db.sync(p)
|
||||
}
|
||||
|
||||
func (db *Database) sync(p sessions.SyncPayload) {
|
||||
bsid := []byte(p.SessionID)
|
||||
|
||||
if p.Action == sessions.ActionDestroy {
|
||||
if err := db.destroy(bsid); err != nil {
|
||||
golog.Errorf("error while destroying a session(%s) from boltdb: %v",
|
||||
p.SessionID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s, err := p.Store.Serialize()
|
||||
if err != nil {
|
||||
golog.Errorf("error while serializing the remote store: %v", err)
|
||||
}
|
||||
|
||||
err = db.Service.Update(func(tx *bolt.Tx) error {
|
||||
return db.getBucket(tx).Put(bsid, s)
|
||||
})
|
||||
if err != nil {
|
||||
golog.Errorf("error while writing the session bucket: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Database) destroy(bsid []byte) error {
|
||||
return db.Service.Update(func(tx *bolt.Tx) error {
|
||||
return db.getBucket(tx).Delete(bsid)
|
||||
})
|
||||
}
|
||||
|
||||
// we store the whole data to the key-value pair of the root bucket
|
||||
// so we don't need a separate bucket for each session
|
||||
// this method could be faster if we had large data to store
|
||||
// but with sessions we recommend small amount of data, so the method finally chosen
|
||||
// is faster (decode/encode the whole store + lifetime and return it as it's)
|
||||
//
|
||||
// func (db *Database) getSessBucket(tx *bolt.Tx, sid string) (*bolt.Bucket, error) {
|
||||
// table, err := db.getBucket(tx).CreateBucketIfNotExists([]byte(sid))
|
||||
// return table, err
|
||||
// }
|
||||
|
||||
func (db *Database) getBucket(tx *bolt.Tx) *bolt.Bucket {
|
||||
return tx.Bucket(db.table)
|
||||
}
|
||||
|
||||
// Len reports the number of sessions that are stored to the this BoltDB table.
|
||||
func (db *Database) Len() (num int) {
|
||||
db.Service.View(func(tx *bolt.Tx) error {
|
||||
// Assume bucket exists and has keys
|
||||
b := db.getBucket(tx)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.ForEach(func([]byte, []byte) error {
|
||||
num++
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Close shutdowns the BoltDB connection.
|
||||
func (db *Database) Close() error {
|
||||
return closeDB(db)
|
||||
}
|
||||
|
||||
func closeDB(db *Database) error {
|
||||
err := db.Service.Close()
|
||||
if err != nil {
|
||||
golog.Warnf("closing the BoltDB connection: %v", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -1,203 +0,0 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/kataras/golog"
|
||||
"github.com/kataras/iris/core/errors"
|
||||
"github.com/kataras/iris/sessions"
|
||||
)
|
||||
|
||||
// DefaultFileMode used as the default database's "fileMode"
|
||||
// for creating the sessions directory path, opening and write the session file.
|
||||
var (
|
||||
DefaultFileMode = 0755
|
||||
)
|
||||
|
||||
// Database is the basic file-storage session database.
|
||||
//
|
||||
// What it does
|
||||
// It removes old(expired) session files, at init (`Cleanup`).
|
||||
// It creates a session file on the first inserted key-value session data.
|
||||
// It removes a session file on destroy.
|
||||
// It sync the session file to the session's memstore on any other action (insert, delete, clear).
|
||||
// It automatically remove the session files on runtime when a session is expired.
|
||||
//
|
||||
// Remember: sessions are not a storage for large data, everywhere: on any platform on any programming language.
|
||||
type Database struct {
|
||||
dir string
|
||||
fileMode os.FileMode // defaults to DefaultFileMode if missing.
|
||||
}
|
||||
|
||||
// New creates and returns a new file-storage database instance based on the "directoryPath".
|
||||
// DirectoryPath should is the directory which the leveldb database will store the sessions,
|
||||
// i.e ./sessions/
|
||||
//
|
||||
// It will remove any old session files.
|
||||
func New(directoryPath string, fileMode os.FileMode) (*Database, error) {
|
||||
lindex := directoryPath[len(directoryPath)-1]
|
||||
if lindex != os.PathSeparator && lindex != '/' {
|
||||
directoryPath += string(os.PathSeparator)
|
||||
}
|
||||
|
||||
if fileMode <= 0 {
|
||||
fileMode = os.FileMode(DefaultFileMode)
|
||||
}
|
||||
|
||||
// create directories if necessary
|
||||
if err := os.MkdirAll(directoryPath, fileMode); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db := &Database{dir: directoryPath, fileMode: fileMode}
|
||||
return db, db.Cleanup()
|
||||
}
|
||||
|
||||
// Cleanup removes any invalid(have expired) session files, it's being called automatically on `New` as well.
|
||||
func (db *Database) Cleanup() error {
|
||||
return filepath.Walk(db.dir, func(path string, info os.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
sessPath := path
|
||||
storeDB, _ := db.load(sessPath) // we don't care about errors here, the file may be not a session a file at all.
|
||||
if storeDB.Lifetime.HasExpired() {
|
||||
os.Remove(path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// FileMode for creating the sessions directory path, opening and write the session file.
|
||||
//
|
||||
// Defaults to 0755.
|
||||
func (db *Database) FileMode(fileMode uint32) *Database {
|
||||
db.fileMode = os.FileMode(fileMode)
|
||||
return db
|
||||
}
|
||||
|
||||
// Async is DEPRECATED
|
||||
// if it was true then it could use different to update the back-end storage, now it does nothing.
|
||||
func (db *Database) Async(useGoRoutines bool) *Database {
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *Database) sessPath(sid string) string {
|
||||
return filepath.Join(db.dir, sid)
|
||||
}
|
||||
|
||||
// Load loads the values from the storage and returns them
|
||||
func (db *Database) Load(sid string) sessions.RemoteStore {
|
||||
sessPath := db.sessPath(sid)
|
||||
store, err := db.load(sessPath)
|
||||
if err != nil {
|
||||
golog.Error(err.Error())
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func (db *Database) load(fileName string) (storeDB sessions.RemoteStore, loadErr error) {
|
||||
f, err := os.OpenFile(fileName, os.O_RDONLY, db.fileMode)
|
||||
|
||||
if err != nil {
|
||||
// we don't care if filepath doesn't exists yet, it will be created later on.
|
||||
return
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
contents, err := ioutil.ReadAll(f)
|
||||
|
||||
if err != nil {
|
||||
loadErr = errors.New("error while reading the session file's data: %v").Format(err)
|
||||
return
|
||||
}
|
||||
|
||||
storeDB, err = sessions.DecodeRemoteStore(contents)
|
||||
|
||||
if err != nil { // we care for this error only
|
||||
loadErr = errors.New("load error: %v").Format(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Sync syncs the database.
|
||||
func (db *Database) Sync(p sessions.SyncPayload) {
|
||||
db.sync(p)
|
||||
}
|
||||
|
||||
func (db *Database) sync(p sessions.SyncPayload) {
|
||||
|
||||
// if destroy then remove the file from the disk
|
||||
if p.Action == sessions.ActionDestroy {
|
||||
if err := db.destroy(p.SessionID); err != nil {
|
||||
golog.Errorf("error while destroying and removing the session file: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err := db.override(p.SessionID, p.Store); err != nil {
|
||||
golog.Errorf("error while writing the session file: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// good idea but doesn't work, it is not just an array of entries
|
||||
// which can be appended with the gob...anyway session data should be small so we don't have problem
|
||||
// with that:
|
||||
|
||||
// on insert new data, it appends to the file
|
||||
// func (db *Database) insert(sid string, entry memstore.Entry) error {
|
||||
// f, err := os.OpenFile(
|
||||
// db.sessPath(sid),
|
||||
// os.O_WRONLY|os.O_CREATE|os.O_RDWR|os.O_APPEND,
|
||||
// db.fileMode,
|
||||
// )
|
||||
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// if _, err := f.Write(serializeEntry(entry)); err != nil {
|
||||
// f.Close()
|
||||
// return err
|
||||
// }
|
||||
|
||||
// return f.Close()
|
||||
// }
|
||||
|
||||
// removes all entries but keeps the file.
|
||||
// func (db *Database) clearAll(sid string) error {
|
||||
// return ioutil.WriteFile(
|
||||
// db.sessPath(sid),
|
||||
// []byte{},
|
||||
// db.fileMode,
|
||||
// )
|
||||
// }
|
||||
|
||||
// on update, remove and clear, it re-writes the file to the current values(may empty).
|
||||
func (db *Database) override(sid string, store sessions.RemoteStore) error {
|
||||
s, err := store.Serialize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ioutil.WriteFile(
|
||||
db.sessPath(sid),
|
||||
s,
|
||||
db.fileMode,
|
||||
)
|
||||
}
|
||||
|
||||
// on destroy, it removes the file
|
||||
func (db *Database) destroy(sid string) error {
|
||||
return db.expireSess(sid)
|
||||
}
|
||||
|
||||
func (db *Database) expireSess(sid string) error {
|
||||
sessPath := db.sessPath(sid)
|
||||
return os.Remove(sessPath)
|
||||
}
|
||||
@@ -1,183 +0,0 @@
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"runtime"
|
||||
|
||||
"github.com/kataras/golog"
|
||||
"github.com/kataras/iris/core/errors"
|
||||
"github.com/kataras/iris/sessions"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
var (
|
||||
// Options used to open the leveldb database, defaults to leveldb's default values.
|
||||
Options = &opt.Options{}
|
||||
// WriteOptions used to put and delete, defaults to leveldb's default values.
|
||||
WriteOptions = &opt.WriteOptions{}
|
||||
// ReadOptions used to iterate over the database, defaults to leveldb's default values.
|
||||
ReadOptions = &opt.ReadOptions{}
|
||||
)
|
||||
|
||||
// Database the LevelDB(file-based) session storage.
|
||||
type Database struct {
|
||||
// Service is the underline LevelDB database connection,
|
||||
// it's initialized at `New` or `NewFromDB`.
|
||||
// Can be used to get stats.
|
||||
Service *leveldb.DB
|
||||
}
|
||||
|
||||
// New creates and returns a new LevelDB(file-based) storage
|
||||
// instance based on the "directoryPath".
|
||||
// DirectoryPath should is the directory which the leveldb database will store the sessions,
|
||||
// i.e ./sessions/
|
||||
//
|
||||
// It will remove any old session files.
|
||||
func New(directoryPath string) (*Database, error) {
|
||||
|
||||
if directoryPath == "" {
|
||||
return nil, errors.New("dir is missing")
|
||||
}
|
||||
|
||||
// Second parameter is a "github.com/syndtr/goleveldb/leveldb/opt.Options{}"
|
||||
// user can change the `Options` or create the sessiondb via `NewFromDB`
|
||||
// if wants to use a customized leveldb database
|
||||
// or an existing one, we don't require leveldb options at the constructor.
|
||||
//
|
||||
// The leveldb creates the directories, if necessary.
|
||||
service, err := leveldb.OpenFile(directoryPath, Options)
|
||||
|
||||
if err != nil {
|
||||
golog.Errorf("unable to initialize the LevelDB-based session database: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewFromDB(service)
|
||||
}
|
||||
|
||||
// NewFromDB same as `New` but accepts an already-created custom leveldb connection instead.
|
||||
func NewFromDB(service *leveldb.DB) (*Database, error) {
|
||||
if service == nil {
|
||||
return nil, errors.New("underline database is missing")
|
||||
}
|
||||
|
||||
db := &Database{Service: service}
|
||||
|
||||
runtime.SetFinalizer(db, closeDB)
|
||||
return db, db.Cleanup()
|
||||
}
|
||||
|
||||
// Cleanup removes any invalid(have expired) session entries,
|
||||
// it's being called automatically on `New` as well.
|
||||
func (db *Database) Cleanup() error {
|
||||
iter := db.Service.NewIterator(nil, ReadOptions)
|
||||
for iter.Next() {
|
||||
// Remember that the contents of the returned slice should not be modified, and
|
||||
// only valid until the next call to Next.
|
||||
k := iter.Key()
|
||||
|
||||
if len(k) > 0 {
|
||||
v := iter.Value()
|
||||
storeDB, err := sessions.DecodeRemoteStore(v)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if storeDB.Lifetime.HasExpired() {
|
||||
if err := db.Service.Delete(k, WriteOptions); err != nil {
|
||||
golog.Warnf("troubles when cleanup a session remote store from LevelDB: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
iter.Release()
|
||||
return iter.Error()
|
||||
}
|
||||
|
||||
// Async is DEPRECATED
|
||||
// if it was true then it could use different to update the back-end storage, now it does nothing.
|
||||
func (db *Database) Async(useGoRoutines bool) *Database {
|
||||
return db
|
||||
}
|
||||
|
||||
// Load loads the sessions from the LevelDB(file-based) session storage.
|
||||
func (db *Database) Load(sid string) (storeDB sessions.RemoteStore) {
|
||||
bsid := []byte(sid)
|
||||
|
||||
iter := db.Service.NewIterator(nil, ReadOptions)
|
||||
for iter.Next() {
|
||||
// Remember that the contents of the returned slice should not be modified, and
|
||||
// only valid until the next call to Next.
|
||||
k := iter.Key()
|
||||
|
||||
if len(k) > 0 {
|
||||
v := iter.Value()
|
||||
if bytes.Equal(k, bsid) { // session id should be the name of the key-value pair
|
||||
store, err := sessions.DecodeRemoteStore(v) // decode the whole value, as a remote store
|
||||
if err != nil {
|
||||
golog.Errorf("error while trying to load from the remote store: %v", err)
|
||||
} else {
|
||||
storeDB = store
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
iter.Release()
|
||||
if err := iter.Error(); err != nil {
|
||||
golog.Errorf("error while trying to iterate over the database: %v", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Sync syncs the database with the session's (memory) store.
|
||||
func (db *Database) Sync(p sessions.SyncPayload) {
|
||||
db.sync(p)
|
||||
}
|
||||
|
||||
func (db *Database) sync(p sessions.SyncPayload) {
|
||||
bsid := []byte(p.SessionID)
|
||||
|
||||
if p.Action == sessions.ActionDestroy {
|
||||
if err := db.destroy(bsid); err != nil {
|
||||
golog.Errorf("error while destroying a session(%s) from leveldb: %v",
|
||||
p.SessionID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s, err := p.Store.Serialize()
|
||||
if err != nil {
|
||||
golog.Errorf("error while serializing the remote store: %v", err)
|
||||
}
|
||||
|
||||
err = db.Service.Put(bsid, s, WriteOptions)
|
||||
|
||||
if err != nil {
|
||||
golog.Errorf("error while writing the session(%s) to the database: %v", p.SessionID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Database) destroy(bsid []byte) error {
|
||||
return db.Service.Delete(bsid, WriteOptions)
|
||||
}
|
||||
|
||||
// Close shutdowns the LevelDB connection.
|
||||
func (db *Database) Close() error {
|
||||
return closeDB(db)
|
||||
}
|
||||
|
||||
func closeDB(db *Database) error {
|
||||
err := db.Service.Close()
|
||||
if err != nil {
|
||||
golog.Warnf("closing the LevelDB connection: %v", err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -14,9 +14,17 @@ type Database struct {
|
||||
redis *service.Service
|
||||
}
|
||||
|
||||
var _ sessions.Database = (*Database)(nil)
|
||||
|
||||
// New returns a new redis database.
|
||||
func New(cfg ...service.Config) *Database {
|
||||
db := &Database{redis: service.New(cfg...)}
|
||||
db.redis.Connect()
|
||||
_, err := db.redis.PingPong()
|
||||
if err != nil {
|
||||
golog.Debugf("error connecting to redis: %v", err)
|
||||
return nil
|
||||
}
|
||||
runtime.SetFinalizer(db, closeDB)
|
||||
return db
|
||||
}
|
||||
@@ -26,73 +34,119 @@ func (db *Database) Config() *service.Config {
|
||||
return db.redis.Config
|
||||
}
|
||||
|
||||
// Async is DEPRECATED
|
||||
// if it was true then it could use different to update the back-end storage, now it does nothing.
|
||||
func (db *Database) Async(useGoRoutines bool) *Database {
|
||||
return db
|
||||
// Acquire receives a session's lifetime from the database,
|
||||
// if the return value is LifeTime{} then the session manager sets the life time based on the expiration duration lives in configuration.
|
||||
func (db *Database) Acquire(sid string, expires time.Duration) sessions.LifeTime {
|
||||
seconds, hasExpiration, found := db.redis.TTL(sid)
|
||||
if !found {
|
||||
// not found, create an entry with ttl and return an empty lifetime, session manager will do its job.
|
||||
if err := db.redis.Set(sid, sid, int64(expires.Seconds())); err != nil {
|
||||
golog.Debug(err)
|
||||
}
|
||||
|
||||
return sessions.LifeTime{} // session manager will handle the rest.
|
||||
}
|
||||
|
||||
if !hasExpiration {
|
||||
return sessions.LifeTime{}
|
||||
|
||||
}
|
||||
|
||||
return sessions.LifeTime{Time: time.Now().Add(time.Duration(seconds) * time.Second)}
|
||||
}
|
||||
|
||||
// Load loads the values to the underline.
|
||||
func (db *Database) Load(sid string) (storeDB sessions.RemoteStore) {
|
||||
// values := make(map[string]interface{})
|
||||
const delim = "_"
|
||||
|
||||
if !db.redis.Connected { //yes, check every first time's session for valid redis connection
|
||||
db.redis.Connect()
|
||||
_, err := db.redis.PingPong()
|
||||
if err != nil {
|
||||
golog.Errorf("redis database error on connect: %v", err)
|
||||
return
|
||||
}
|
||||
func makeKey(sid, key string) string {
|
||||
return sid + delim + key
|
||||
}
|
||||
|
||||
// Set sets a key value of a specific session.
|
||||
// Ignore the "immutable".
|
||||
func (db *Database) Set(sid string, lifetime sessions.LifeTime, key string, value interface{}, immutable bool) {
|
||||
valueBytes, err := sessions.DefaultTranscoder.Marshal(value)
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// fetch the values from this session id and copy-> store them
|
||||
storeMaybe, err := db.redis.Get(sid)
|
||||
// exists
|
||||
if err == nil {
|
||||
storeB, ok := storeMaybe.([]byte)
|
||||
if !ok {
|
||||
golog.Errorf("something wrong, store should be stored as []byte but stored as %#v", storeMaybe)
|
||||
return
|
||||
}
|
||||
|
||||
storeDB, err = sessions.DecodeRemoteStore(storeB) // decode the whole value, as a remote store
|
||||
if err != nil {
|
||||
golog.Errorf(`error while trying to load session values(%s) from redis:
|
||||
the retrieved value is not a sessions.RemoteStore type, please report that as bug, that should never occur: %v`,
|
||||
sid, err)
|
||||
}
|
||||
if err = db.redis.Set(makeKey(sid, key), valueBytes, int64(lifetime.DurationUntilExpiration().Seconds())); err != nil {
|
||||
golog.Debug(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a session value based on the key.
|
||||
func (db *Database) Get(sid string, key string) (value interface{}) {
|
||||
db.get(makeKey(sid, key), &value)
|
||||
return
|
||||
}
|
||||
|
||||
// Sync syncs the database.
|
||||
func (db *Database) Sync(p sessions.SyncPayload) {
|
||||
db.sync(p)
|
||||
}
|
||||
|
||||
func (db *Database) sync(p sessions.SyncPayload) {
|
||||
if p.Action == sessions.ActionDestroy {
|
||||
db.redis.Delete(p.SessionID)
|
||||
return
|
||||
}
|
||||
storeB, err := p.Store.Serialize()
|
||||
func (db *Database) get(key string, outPtr interface{}) {
|
||||
data, err := db.redis.Get(key)
|
||||
if err != nil {
|
||||
golog.Error("error while encoding the remote session store")
|
||||
// not found.
|
||||
return
|
||||
}
|
||||
|
||||
// not expire if zero
|
||||
seconds := 0
|
||||
|
||||
if lifetime := p.Store.Lifetime; !lifetime.IsZero() {
|
||||
seconds = int(lifetime.Sub(time.Now()).Seconds())
|
||||
if err = sessions.DefaultTranscoder.Unmarshal(data.([]byte), outPtr); err != nil {
|
||||
golog.Debugf("unable to unmarshal value of key: '%s': %v", key, err)
|
||||
}
|
||||
|
||||
db.redis.Set(p.SessionID, storeB, seconds)
|
||||
}
|
||||
|
||||
// Close shutdowns the redis connection.
|
||||
func (db *Database) keys(sid string) []string {
|
||||
keys, err := db.redis.GetKeys(sid + delim)
|
||||
if err != nil {
|
||||
golog.Debugf("unable to get all redis keys of session '%s': %v", sid, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
// Visit loops through all session keys and values.
|
||||
func (db *Database) Visit(sid string, cb func(key string, value interface{})) {
|
||||
keys := db.keys(sid)
|
||||
for _, key := range keys {
|
||||
var value interface{} // new value each time, we don't know what user will do in "cb".
|
||||
db.get(key, &value)
|
||||
cb(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns the length of the session's entries (keys).
|
||||
func (db *Database) Len(sid string) (n int) {
|
||||
return len(db.keys(sid))
|
||||
}
|
||||
|
||||
// Delete removes a session key value based on its key.
|
||||
func (db *Database) Delete(sid string, key string) (deleted bool) {
|
||||
err := db.redis.Delete(makeKey(sid, key))
|
||||
if err != nil {
|
||||
golog.Error(err)
|
||||
}
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Clear removes all session key values but it keeps the session entry.
|
||||
func (db *Database) Clear(sid string) {
|
||||
keys := db.keys(sid)
|
||||
for _, key := range keys {
|
||||
if err := db.redis.Delete(key); err != nil {
|
||||
golog.Debugf("unable to delete session '%s' value of key: '%s': %v", sid, key, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Release destroys the session, it clears and removes the session entry,
|
||||
// session manager will create a new session ID on the next request after this call.
|
||||
func (db *Database) Release(sid string) {
|
||||
// clear all $sid-$key.
|
||||
db.Clear(sid)
|
||||
// and remove the $sid.
|
||||
db.redis.Delete(sid)
|
||||
}
|
||||
|
||||
// Close terminates the redis connection.
|
||||
func (db *Database) Close() error {
|
||||
return closeDB(db)
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/kataras/iris/core/errors"
|
||||
)
|
||||
|
||||
@@ -44,7 +45,7 @@ func (r *Service) CloseConnection() error {
|
||||
|
||||
// Set sets a key-value to the redis store.
|
||||
// The expiration is setted by the MaxAgeSeconds.
|
||||
func (r *Service) Set(key string, value interface{}, secondsLifetime int) (err error) {
|
||||
func (r *Service) Set(key string, value interface{}, secondsLifetime int64) (err error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
if c.Err() != nil {
|
||||
@@ -81,6 +82,23 @@ func (r *Service) Get(key string) (interface{}, error) {
|
||||
return redisVal, nil
|
||||
}
|
||||
|
||||
// TTL returns the seconds to expire, if the key has expiration and error if action failed.
|
||||
// Read more at: https://redis.io/commands/ttl
|
||||
func (r *Service) TTL(key string) (seconds int64, hasExpiration bool, ok bool) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
redisVal, err := c.Do("TTL", r.Config.Prefix+key)
|
||||
if err != nil {
|
||||
return -2, false, false
|
||||
}
|
||||
seconds = redisVal.(int64)
|
||||
// if -1 means the key has unlimited life time.
|
||||
hasExpiration = seconds == -1
|
||||
// if -2 means key does not exist.
|
||||
ok = (c.Err() != nil || seconds == -2)
|
||||
return
|
||||
}
|
||||
|
||||
// GetAll returns all redis entries using the "SCAN" command (2.8+).
|
||||
func (r *Service) GetAll() (interface{}, error) {
|
||||
c := r.pool.Get()
|
||||
@@ -102,6 +120,48 @@ func (r *Service) GetAll() (interface{}, error) {
|
||||
return redisVal, nil
|
||||
}
|
||||
|
||||
// GetKeys returns all redis keys using the "SCAN" with MATCH command.
|
||||
// Read more at: https://redis.io/commands/scan#the-match-option.
|
||||
func (r *Service) GetKeys(prefix string) ([]string, error) {
|
||||
c := r.pool.Get()
|
||||
defer c.Close()
|
||||
if err := c.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.Send("SCAN", 0, "MATCH", r.Config.Prefix+prefix+"*", "COUNT", 9999999999); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.Flush(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reply, err := c.Receive()
|
||||
if err != nil || reply == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// it returns []interface, with two entries, the first one is "0" and the second one is a slice of the keys as []interface{uint8....}.
|
||||
|
||||
if keysInterface, ok := reply.([]interface{}); ok {
|
||||
if len(keysInterface) == 2 {
|
||||
// take the second, it must contain the slice of keys.
|
||||
if keysSliceAsBytes, ok := keysInterface[1].([]interface{}); ok {
|
||||
keys := make([]string, len(keysSliceAsBytes), len(keysSliceAsBytes))
|
||||
for i, k := range keysSliceAsBytes {
|
||||
keys[i] = fmt.Sprintf("%s", k)
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetBytes returns value, err by its key
|
||||
// you can use utils.Deserialize((.GetBytes("yourkey"),&theobject{})
|
||||
//returns nil and a filled error if something wrong happens
|
||||
|
||||
Reference in New Issue
Block a user