From a8663e0b1a85dc732c90c90a7f665904edad4cd6 Mon Sep 17 00:00:00 2001 From: Sherif Mowafy Date: Thu, 8 Feb 2018 16:48:37 +0000 Subject: [PATCH 1/2] Change the implementation to support moving queue to ready after a time period --- crow.go | 3 ++- murder.go | 24 ++++++++++++++++++++++-- redis-crow.go | 21 +++++++++++++++++++-- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/crow.go b/crow.go index 22aa06b..dee9feb 100644 --- a/crow.go +++ b/crow.go @@ -4,7 +4,8 @@ package murder // Interface for any storage system for the orchestrator to use type Crow interface { QueueSize(string) int // Query main queue size - AddToQueue(string, interface{}) // Add object to queue + QueueTimeSinceCreation(string) int // Query time since queue creation + AddToQueue(*Murder, interface{}) // Add object to queue GetQueueContents(string) []string // Retrieve all contents of queue ClearQueue(string, string) error // Clear all queue contents CreateLockKey(string, string, int) bool // Create lock key for a queue, confirm if lock acquired, and set TTL diff --git a/murder.go b/murder.go index eaeaa20..9d358e7 100644 --- a/murder.go +++ b/murder.go @@ -8,17 +8,23 @@ type Murder struct { queueSize int lockTTL int workerGroupID string + queueAge int } // Add : // Create a job in any queue func (m *Murder) Add(obj interface{}) { size := m.crow.QueueSize(m.workerGroupID) - if size >= m.queueSize { + ageConfigured := m.AgeConfigured() + var age int + if (ageConfigured) { + age = m.crow.QueueTimeSinceCreation(m.workerGroupID) + } + if size >= m.queueSize || (ageConfigured && age > m.queueAge) { queueName := newUUID() m.crow.MoveToReady(m.workerGroupID, queueName) } - m.crow.AddToQueue(m.workerGroupID, obj) + m.crow.AddToQueue(m, obj) } // Lock : @@ -79,6 +85,10 @@ func (m *Murder) Unlock(lockKey string) { m.crow.RemoveLockKey(lockKey) } +func (m *Murder) AgeConfigured() bool { + return (m.queueAge > 0) +} + // NewMurder : // Returns a new instance of murder with the given options func NewMurder(bulkSize, TTL int, crow Crow, groupID string) *Murder { @@ -89,3 +99,13 @@ func NewMurder(bulkSize, TTL int, crow Crow, groupID string) *Murder { workerGroupID: groupID, } } + + +// MurderWithAge: +// Returns a new instance of murder with extra option of queue age + +func NewMurderWithAge(bulkSize, TTL int, crow Crow, groupID string, queueAge int) *Murder { + murder := NewMurder(bulkSize, TTL, crow, groupID) + murder.queueAge = queueAge + return murder +} diff --git a/redis-crow.go b/redis-crow.go index 0ccf4c6..8a9fad2 100644 --- a/redis-crow.go +++ b/redis-crow.go @@ -5,7 +5,7 @@ import ( "fmt" "log" "time" - + "strconv" "gopkg.in/redis.v5" ) @@ -18,12 +18,28 @@ func (c *RedisCrow) QueueSize(groupName string) int { return int(size) } +func (c *RedisCrow) QueueTimeSinceCreation(groupName string) int { + queueAgeKey := fmt.Sprintf("murder::%s::age", c.CurrentQueue(groupName)) + timeSinceCreation, _ := c.Redis.Get(queueAgeKey).Result() + timeSinceCreationInt, _ := strconv.ParseInt(timeSinceCreation, 10, 64) + age := time.Now().Unix() - timeSinceCreationInt + return int(age) +} + + func (c *RedisCrow) CurrentQueue(groupName string) string { return fmt.Sprintf("murder::%s::mainQueue", groupName) } -func (c *RedisCrow) AddToQueue(groupName string, obj interface{}) { +func (c *RedisCrow) AddToQueue(murder *Murder, obj interface{}) { + groupName := murder.workerGroupID + ageConfigured := murder.AgeConfigured() marshalled, _ := json.Marshal(obj) + currentQueue := c.CurrentQueue(groupName) + if !c.Redis.Exists(currentQueue).Val() && ageConfigured { + queueAgeKey := fmt.Sprintf("murder::%s::age", currentQueue) + c.Redis.Set(queueAgeKey, time.Now().Unix(), time.Duration(0)) + } c.Redis.LPush(c.CurrentQueue(groupName), marshalled).Result() } @@ -81,6 +97,7 @@ func (c *RedisCrow) RemoveLockKey(lockKey string) { func (c *RedisCrow) MoveToReady(groupName, newName string) { ok, _ := c.Redis.SetNX(fmt.Sprintf("%s::lock", c.CurrentQueue(groupName)), "1", time.Duration(1)*time.Second).Result() if ok { + c.Redis.Del(fmt.Sprintf("murder::%s::age", c.CurrentQueue(groupName))) c.Redis.Rename(c.CurrentQueue(groupName), fmt.Sprintf("murder::crows::%s", newName)) c.Redis.SAdd(fmt.Sprintf("murder::%s::ready", groupName), newName) c.Redis.Del(fmt.Sprintf("%s::lock", c.CurrentQueue(groupName))) From 6a42e2be4d55cf1472a9c4eb5e880f3d3b6f2d7f Mon Sep 17 00:00:00 2001 From: Sherif Mowafy Date: Thu, 8 Feb 2018 18:03:48 +0000 Subject: [PATCH 2/2] Change crow AddQueue method to accept individual attributes instead of the murder object --- crow.go | 2 +- murder.go | 2 +- redis-crow.go | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/crow.go b/crow.go index dee9feb..3982a5d 100644 --- a/crow.go +++ b/crow.go @@ -5,7 +5,7 @@ package murder type Crow interface { QueueSize(string) int // Query main queue size QueueTimeSinceCreation(string) int // Query time since queue creation - AddToQueue(*Murder, interface{}) // Add object to queue + AddToQueue(string, interface{}, bool) // Add object to queue GetQueueContents(string) []string // Retrieve all contents of queue ClearQueue(string, string) error // Clear all queue contents CreateLockKey(string, string, int) bool // Create lock key for a queue, confirm if lock acquired, and set TTL diff --git a/murder.go b/murder.go index 9d358e7..13abac5 100644 --- a/murder.go +++ b/murder.go @@ -24,7 +24,7 @@ func (m *Murder) Add(obj interface{}) { queueName := newUUID() m.crow.MoveToReady(m.workerGroupID, queueName) } - m.crow.AddToQueue(m, obj) + m.crow.AddToQueue(m.workerGroupID, obj, ageConfigured) } // Lock : diff --git a/redis-crow.go b/redis-crow.go index 8a9fad2..d28ee33 100644 --- a/redis-crow.go +++ b/redis-crow.go @@ -31,9 +31,7 @@ func (c *RedisCrow) CurrentQueue(groupName string) string { return fmt.Sprintf("murder::%s::mainQueue", groupName) } -func (c *RedisCrow) AddToQueue(murder *Murder, obj interface{}) { - groupName := murder.workerGroupID - ageConfigured := murder.AgeConfigured() +func (c *RedisCrow) AddToQueue(groupName string, obj interface{}, ageConfigured bool) { marshalled, _ := json.Marshal(obj) currentQueue := c.CurrentQueue(groupName) if !c.Redis.Exists(currentQueue).Val() && ageConfigured {