Skip to content

Commit

Permalink
Make Couch/replication check compatible between different versions of…
Browse files Browse the repository at this point in the history
… CouchDB
  • Loading branch information
amaltaro committed May 27, 2022
1 parent 4725b15 commit 6321bd0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
24 changes: 8 additions & 16 deletions src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,18 @@ def collectWorkQueueInfo(self):

return results

def collectCouchDBInfo(self):
def checkCouchStatus(self):
"""
This method collects information from CouchDB, such that it can report
on any problematic replication task
This method checks whether CouchDB is running properly and it also
verifies whether all the replication tasks are progressing as expected
:return: a dictionary with the status for CouchServer
"""
couchInfo = {'name': 'CouchServer', 'status': 'ok', 'error_message': ""}

replicationDocs = self.localCouchMonitor.getSchedulerDocs()

if len(replicationDocs) != len(self.replicatorDocs):
msg = "Could not find all the replication documents in CouchDB."
msg += "Scheduler docs report the following: {}".format(replicationDocs)
couchInfo.update({'status': 'error', 'error_message': msg})
return couchInfo

for rp in replicationDocs:
if rp['state'].lower() not in ["pending", "running"]:
couchInfo['status'] = 'error'
couchInfo['error_message'] = rp.get('error_message', rp.get('info'))
for rp in self.replicatorDocs:
cInfo = self.localCouchMonitor.checkCouchServerStatus(rp['source'], rp['target'])
if cInfo['status'] != 'ok':
couchInfo.update({'status': 'error', 'error_message': cInfo['error_message']})

return couchInfo

Expand Down Expand Up @@ -214,7 +206,7 @@ def collectAgentInfo(self):
else:
agentInfo['drain_mode'] = False

couchInfo = self.collectCouchDBInfo()
couchInfo = self.checkCouchStatus()
if couchInfo['status'] != 'ok':
agentInfo['down_components'].append(couchInfo['name'])
agentInfo['status'] = couchInfo['status']
Expand Down
34 changes: 26 additions & 8 deletions src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,13 @@ def __init__(self, dburl='http://localhost:5984', usePYCurl=True, ckey=None, cer
self.ckey = ckey
self.cert = cert

def getCouchWelcome(self):
"""
Retrieve CouchDB welcome information (which includes the version number)
:return: a dictionary
"""
return self.get('')

def listDatabases(self):
"List all the databases the server hosts"
return self.get('/_all_dbs')
Expand Down Expand Up @@ -1149,8 +1156,10 @@ def __init__(self, couchURL):
self.couchServer = CouchServer(couchURL)

self.replicatorDB = self.couchServer.connectDatabase('_replicator', False)
# this is set {source: {taget: update_sequence}}
self.previousUpdateSequence = {}

# use the CouchDB version to decide which APIs and schema is available
couchInfo = self.couchServer.getCouchWelcome()
self.couchVersion = couchInfo.get("version")

def deleteReplicatorDocs(self, source=None, target=None, repDocs=None):
if repDocs is None:
Expand Down Expand Up @@ -1242,21 +1251,30 @@ def getSchedulerDocs(self):
def checkCouchServerStatus(self, source, target):
"""
Check the status of a specific replication task
:param source: string with the source database
:param target: string with the target database
:return: a dictionary with the current state of that replication
"""
replDocs = self.getSchedulerDocs()
passwdStrippedTarget = target.split("@")[-1]
if self.couchVersion == "1.6.1":
activeTasks = self.couchServer.getActiveTasks()
# keep only replications
activeTasks = [task for task in activeTasks if task["type"].lower() == "replication"]
else:
# then assume it's CouchDB >= 3.x
activeTasks = self.getSchedulerDocs()

for replication in replDocs:
for replication in activeTasks:
if passwdStrippedTarget in replication.get("target", ""):
if replication.get('state', '').lower() in ["pending", "running"]:
# note that CouchDB 1.6 does not have this 'state' key
if replication.get('state', 'running').lower() in ["pending", "running"]:
# now check the replication based on update_on attribute
if self.checkReplicationStatus(replication, source, passwdStrippedTarget):
if self.isReplicationOK(replication, source, passwdStrippedTarget):
return {'status': 'ok'}
else:
msg = "Replication from %s to %s is stale" % (source, target)
msg = f"Replication from {source} to {target} is stale "
msg += f"in state: {replication.get('state')}"
return {'status': 'error', 'error_message': msg}
else:
msg = "Replication stopped from %s to %s.\n" % (source, passwdStrippedTarget)
Expand All @@ -1265,7 +1283,7 @@ def checkCouchServerStatus(self, source, target):
msg = "Failed to find a replication document from %s to %s.\n" % (source, passwdStrippedTarget)
return {'status': 'down', 'error_message': msg}

def checkReplicationStatus(self, replInfo, source, target):
def isReplicationOK(self, replInfo, source, target):
"""
Checks whether a given replication task has been updated over the
last 5 minutes.
Expand Down

0 comments on commit 6321bd0

Please sign in to comment.