Skip to content

Commit

Permalink
Merge pull request #1 from mohamed-essam/add_time_fallback_option
Browse files Browse the repository at this point in the history
Add timed queue option
  • Loading branch information
smowafy authored Feb 8, 2018
2 parents 4341a9c + 6a42e2b commit 84547a3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
3 changes: 2 additions & 1 deletion crow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ package murder
// Interface for any storage system for the orchestrator to use
type Crow interface {
QueueSize(string) int // Query main queue size
AddToQueue(string, interface{}) // Add object to queue
QueueTimeSinceCreation(string) int // Query time since queue creation
AddToQueue(string, interface{}, bool) // Add object to queue
GetQueueContents(string) []string // Retrieve all contents of queue
ClearQueue(string, string) error // Clear all queue contents
CreateLockKey(string, string, int) bool // Create lock key for a queue, confirm if lock acquired, and set TTL
Expand Down
24 changes: 22 additions & 2 deletions murder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ type Murder struct {
queueSize int
lockTTL int
workerGroupID string
queueAge int
}

// Add :
// Create a job in any queue
func (m *Murder) Add(obj interface{}) {
size := m.crow.QueueSize(m.workerGroupID)
if size >= m.queueSize {
ageConfigured := m.AgeConfigured()
var age int
if (ageConfigured) {
age = m.crow.QueueTimeSinceCreation(m.workerGroupID)
}
if size >= m.queueSize || (ageConfigured && age > m.queueAge) {
queueName := newUUID()
m.crow.MoveToReady(m.workerGroupID, queueName)
}
m.crow.AddToQueue(m.workerGroupID, obj)
m.crow.AddToQueue(m.workerGroupID, obj, ageConfigured)
}

// Lock :
Expand Down Expand Up @@ -79,6 +85,10 @@ func (m *Murder) Unlock(lockKey string) {
m.crow.RemoveLockKey(lockKey)
}

func (m *Murder) AgeConfigured() bool {
return (m.queueAge > 0)
}

// NewMurder :
// Returns a new instance of murder with the given options
func NewMurder(bulkSize, TTL int, crow Crow, groupID string) *Murder {
Expand All @@ -89,3 +99,13 @@ func NewMurder(bulkSize, TTL int, crow Crow, groupID string) *Murder {
workerGroupID: groupID,
}
}


// MurderWithAge:
// Returns a new instance of murder with extra option of queue age

func NewMurderWithAge(bulkSize, TTL int, crow Crow, groupID string, queueAge int) *Murder {
murder := NewMurder(bulkSize, TTL, crow, groupID)
murder.queueAge = queueAge
return murder
}
19 changes: 17 additions & 2 deletions redis-crow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"log"
"time"

"strconv"
"gopkg.in/redis.v5"
)

Expand All @@ -18,12 +18,26 @@ func (c *RedisCrow) QueueSize(groupName string) int {
return int(size)
}

func (c *RedisCrow) QueueTimeSinceCreation(groupName string) int {
queueAgeKey := fmt.Sprintf("murder::%s::age", c.CurrentQueue(groupName))
timeSinceCreation, _ := c.Redis.Get(queueAgeKey).Result()
timeSinceCreationInt, _ := strconv.ParseInt(timeSinceCreation, 10, 64)
age := time.Now().Unix() - timeSinceCreationInt
return int(age)
}


func (c *RedisCrow) CurrentQueue(groupName string) string {
return fmt.Sprintf("murder::%s::mainQueue", groupName)
}

func (c *RedisCrow) AddToQueue(groupName string, obj interface{}) {
func (c *RedisCrow) AddToQueue(groupName string, obj interface{}, ageConfigured bool) {
marshalled, _ := json.Marshal(obj)
currentQueue := c.CurrentQueue(groupName)
if !c.Redis.Exists(currentQueue).Val() && ageConfigured {
queueAgeKey := fmt.Sprintf("murder::%s::age", currentQueue)
c.Redis.Set(queueAgeKey, time.Now().Unix(), time.Duration(0))
}
c.Redis.LPush(c.CurrentQueue(groupName), marshalled).Result()
}

Expand Down Expand Up @@ -81,6 +95,7 @@ func (c *RedisCrow) RemoveLockKey(lockKey string) {
func (c *RedisCrow) MoveToReady(groupName, newName string) {
ok, _ := c.Redis.SetNX(fmt.Sprintf("%s::lock", c.CurrentQueue(groupName)), "1", time.Duration(1)*time.Second).Result()
if ok {
c.Redis.Del(fmt.Sprintf("murder::%s::age", c.CurrentQueue(groupName)))
c.Redis.Rename(c.CurrentQueue(groupName), fmt.Sprintf("murder::crows::%s", newName))
c.Redis.SAdd(fmt.Sprintf("murder::%s::ready", groupName), newName)
c.Redis.Del(fmt.Sprintf("%s::lock", c.CurrentQueue(groupName)))
Expand Down

0 comments on commit 84547a3

Please sign in to comment.