From c0a987dad18c455a76427475d34da6ddb3f3e6bf Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Apr 2019 21:22:02 +0800 Subject: [PATCH 01/12] updated cron schedule logic --- crawlab/app.py | 18 +++++++++--------- crawlab/config.py | 25 +++++++++++++++++++++---- crawlab/routes/schedules.py | 4 ++++ crawlab/routes/tasks.py | 13 +++++++++++++ crawlab/tasks/celery.py | 5 +++++ crawlab/tasks/scheduler.py | 27 ++++++++++----------------- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/crawlab/app.py b/crawlab/app.py index b7a245b34..4eebc804d 100644 --- a/crawlab/app.py +++ b/crawlab/app.py @@ -88,17 +88,17 @@ def update_nodes_status_online(event): recv.capture(limit=None, timeout=None, wakeup=True) -if __name__ == '__main__': - # create folder if it does not exist - if not os.path.exists(PROJECT_LOGS_FOLDER): - os.makedirs(PROJECT_LOGS_FOLDER) +# run scheduler as a separate process +scheduler.run() - # run scheduler as a separate process - scheduler.run() +# monitor node status +p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) +p_monitor.start() - # monitor node status - p_monitor = Process(target=monitor_nodes_status, args=(celery_app,)) - p_monitor.start() +# create folder if it does not exist +if not os.path.exists(PROJECT_LOGS_FOLDER): + os.makedirs(PROJECT_LOGS_FOLDER) +if __name__ == '__main__': # run app instance app.run(host=FLASK_HOST, port=FLASK_PORT, threaded=True) diff --git a/crawlab/config.py b/crawlab/config.py index 4ede83b78..d2d69f816 100644 --- a/crawlab/config.py +++ b/crawlab/config.py @@ -1,33 +1,50 @@ -# project variables # 爬虫源码路径 PROJECT_SOURCE_FILE_FOLDER = '../spiders' + # 配置python虚拟环境的路径 PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' + # 爬虫部署路径 PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +# 爬虫日志路径 PROJECT_LOGS_FOLDER = '../deployfile/logs' + +# 打包临时文件夹 PROJECT_TMP_FOLDER = '/tmp' -# celery variables +# Celery中间者URL BROKER_URL = 'redis://127.0.0.1:6379/0' + +# Celery后台URL CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/' + +# Celery MongoDB设置 CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', } + +# Celery时区 CELERY_TIMEZONE = 'Asia/Shanghai' + +# 是否启用UTC CELERY_ENABLE_UTC = True +# Celery Scheduler Redis URL +CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler' +CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379' +CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks' + # flower variables FLOWER_API_ENDPOINT = 'http://localhost:5555/api' -# database variables +# MongoDB 变量 MONGO_HOST = '127.0.0.1' MONGO_PORT = 27017 MONGO_DB = 'crawlab_test' -# flask variables +# Flask 变量 DEBUG = True FLASK_HOST = '127.0.0.1' FLASK_PORT = 8000 diff --git a/crawlab/routes/schedules.py b/crawlab/routes/schedules.py index f966e2cb9..532a4ec53 100644 --- a/crawlab/routes/schedules.py +++ b/crawlab/routes/schedules.py @@ -5,6 +5,7 @@ from constants.task import TaskStatus from db.manager import db_manager from routes.base import BaseApi +from tasks.scheduler import scheduler from utils import jsonify from utils.spider import get_spider_col_fields @@ -18,3 +19,6 @@ class ScheduleApi(BaseApi): ('cron', str), ('spider_id', str) ) + + def after_update(self, id: str = None): + scheduler.update() diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index 59e8469bf..ebe5a33b1 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -60,13 +60,26 @@ def get(self, id: str = None, action: str = None): sort_key='create_ts') items = [] for task in tasks: + # celery tasks # _task = db_manager.get('tasks_celery', id=task['_id']) + + # get spider _spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) + + # status if task.get('status') is None: task['status'] = TaskStatus.UNAVAILABLE + + # spider name if _spider: task['spider_name'] = _spider['name'] + + # duration + if task.get('finish_ts') is not None: + task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + items.append(task) + return { 'status': 'ok', 'total_count': db_manager.count('tasks', {}), diff --git a/crawlab/tasks/celery.py b/crawlab/tasks/celery.py index 3def1b29e..8cb7501b3 100644 --- a/crawlab/tasks/celery.py +++ b/crawlab/tasks/celery.py @@ -1,5 +1,10 @@ from celery import Celery +# from redisbeat.scheduler import RedisScheduler +from utils.redisbeat import RedisScheduler # celery app instance celery_app = Celery(__name__) celery_app.config_from_object('config') + +# RedisBeat scheduler +celery_scheduler = RedisScheduler(app=celery_app) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index da6303c9a..b03639ea4 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -2,24 +2,27 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from pymongo import MongoClient -from flask import current_app from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT from constants.spider import CronEnabled from db.manager import db_manager +from tasks.celery import celery_scheduler class Scheduler(object): mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) + task_col = 'apscheduler_jobs' jobstores = { 'mongo': MongoDBJobStore(database=MONGO_DB, - collection='apscheduler_jobs', + collection=task_col, client=mongo) } scheduler = BackgroundScheduler(jobstores=jobstores) + # scheduler = celery_scheduler + def execute_spider(self, id: str): r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, @@ -27,21 +30,13 @@ def execute_spider(self, id: str): id )) - def restart(self): - self.scheduler.shutdown() - self.scheduler.start() - current_app.logger.info('restarted') - def update(self): - current_app.logger.info('updating...') - # remove all existing periodic jobs - self.scheduler.remove_all_jobs() + self.mongo[MONGO_DB][self.task_col].remove() - # add new periodic jobs from database - spiders = db_manager.list('spiders', {'cron_enabled': CronEnabled.ON}) - for spider in spiders: - cron = spider.get('cron') + periodical_tasks = db_manager.list('schedules', {}) + for task in periodical_tasks: + cron = task.get('cron') cron_arr = cron.split(' ') second = cron_arr[0] minute = cron_arr[1] @@ -49,13 +44,11 @@ def update(self): day = cron_arr[3] month = cron_arr[4] day_of_week = cron_arr[5] - self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(spider['_id']),), + self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),), jobstore='mongo', day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute, second=second) - current_app.logger.info('updated') - def run(self): self.update() self.scheduler.start() From daeaa05d85122b055a4963afaa2e94958a867062 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Apr 2019 21:50:30 +0800 Subject: [PATCH 02/12] added filter for TaskList --- .../components/InfoView/SpiderInfoView.vue | 40 +++++++++--------- frontend/src/store/modules/task.js | 11 ++++- frontend/src/views/task/TaskList.vue | 42 +++++++++++++------ 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/frontend/src/components/InfoView/SpiderInfoView.vue b/frontend/src/components/InfoView/SpiderInfoView.vue index a02088ada..6bc1a1578 100644 --- a/frontend/src/components/InfoView/SpiderInfoView.vue +++ b/frontend/src/components/InfoView/SpiderInfoView.vue @@ -38,26 +38,26 @@ - - - - - - - - + + + + + + + + + + + + + + + + + + + + diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index c421a79b7..02a238411 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -9,6 +9,11 @@ const state = { taskResultsData: [], taskResultsColumns: [], taskResultsTotalCount: 0, + // filter + filter: { + node_id: '', + spider_id: '' + }, // pagination pageNum: 0, pageSize: 10, @@ -68,7 +73,11 @@ const actions = { getTaskList ({ state, commit }) { return request.get('/tasks', { page_num: state.pageNum, - page_size: state.pageSize + page_size: state.pageSize, + filter: { + node_id: state.filter.node_id || undefined, + spider_id: state.filter.spider_id || undefined + } }) .then(response => { commit('SET_TASK_LIST', response.data.items) diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 87fa68043..fe40fe5c7 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -2,20 +2,22 @@
- - -
+
+ + + + + + - {{$t('Refresh')}} + {{$t('Search')}}
+ +
@@ -102,26 +104,31 @@ export default { return { isEditMode: false, dialogVisible: false, - filter: { - keyword: '' - }, // tableData, columns: [ { name: 'create_ts', label: 'Create Time', width: '150' }, { name: 'start_ts', label: 'Start Time', width: '150' }, { name: 'finish_ts', label: 'Finish Time', width: '150' }, + { name: 'duration', label: 'Duration (sec)', width: '80' }, { name: 'spider_name', label: 'Spider', width: '160' }, { name: 'node_id', label: 'Node', width: '160' }, - { name: 'status', label: 'Status', width: '160', sortable: true } + { name: 'status', label: 'Status', width: '80' } ] } }, computed: { ...mapState('task', [ + 'filter', 'taskList', 'taskListTotalCount', 'taskForm' ]), + ...mapState('spider', [ + 'spiderList' + ]), + ...mapState('node', [ + 'nodeList' + ]), pageNum: { get () { return this.$store.state.task.pageNum @@ -200,6 +207,8 @@ export default { }, created () { this.$store.dispatch('task/getTaskList') + this.$store.dispatch('spider/getSpiderList') + this.$store.dispatch('node/getNodeList') } } @@ -215,6 +224,13 @@ export default { display: flex; justify-content: space-between; + .left { + .filter-select { + width: 180px; + margin-right: 10px; + } + } + .filter-search { width: 240px; } From 82ac9bec72e2da95f1b8017e05eec00b23efd599 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Apr 2019 22:07:48 +0800 Subject: [PATCH 03/12] updated scheduler --- crawlab/routes/base.py | 4 ++-- crawlab/routes/tasks.py | 10 ++++++++-- crawlab/tasks/celery.py | 5 ----- crawlab/tasks/scheduler.py | 6 +++--- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/crawlab/routes/base.py b/crawlab/routes/base.py index 8a3d67090..1578b3f84 100644 --- a/crawlab/routes/base.py +++ b/crawlab/routes/base.py @@ -23,7 +23,7 @@ def __init__(self): super(BaseApi).__init__() self.parser.add_argument('page_num', type=int) self.parser.add_argument('page_size', type=int) - self.parser.add_argument('filter', type=dict) + self.parser.add_argument('filter', type=str) for arg, type in self.arguments: self.parser.add_argument(arg, type=type) @@ -109,7 +109,7 @@ def put(self) -> (dict, tuple): item[k] = args.get(k) item = db_manager.save(col_name=self.col_name, item=item) - self.after_update(item._id) + self.after_update() return item diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index ebe5a33b1..86e75dabb 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -56,7 +56,13 @@ def get(self, id: str = None, action: str = None): args = self.parser.parse_args() page_size = args.get('page_size') or 10 page_num = args.get('page_num') or 1 - tasks = db_manager.list(col_name=self.col_name, cond={}, limit=page_size, skip=page_size * (page_num - 1), + filter_str = args.get('filter') + filter_ = {} + if filter_str is not None: + filter_ = json.loads(filter_str) + if filter_.get('spider_id'): + filter_['spider_id'] = ObjectId(filter_['spider_id']) + tasks = db_manager.list(col_name=self.col_name, cond=filter_, limit=page_size, skip=page_size * (page_num - 1), sort_key='create_ts') items = [] for task in tasks: @@ -82,7 +88,7 @@ def get(self, id: str = None, action: str = None): return { 'status': 'ok', - 'total_count': db_manager.count('tasks', {}), + 'total_count': db_manager.count('tasks', filter_), 'page_num': page_num, 'page_size': page_size, 'items': jsonify(items) diff --git a/crawlab/tasks/celery.py b/crawlab/tasks/celery.py index 8cb7501b3..3def1b29e 100644 --- a/crawlab/tasks/celery.py +++ b/crawlab/tasks/celery.py @@ -1,10 +1,5 @@ from celery import Celery -# from redisbeat.scheduler import RedisScheduler -from utils.redisbeat import RedisScheduler # celery app instance celery_app = Celery(__name__) celery_app.config_from_object('config') - -# RedisBeat scheduler -celery_scheduler = RedisScheduler(app=celery_app) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index b03639ea4..bf29607f6 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -6,23 +6,22 @@ from config import MONGO_DB, MONGO_HOST, MONGO_PORT, FLASK_HOST, FLASK_PORT from constants.spider import CronEnabled from db.manager import db_manager -from tasks.celery import celery_scheduler class Scheduler(object): mongo = MongoClient(host=MONGO_HOST, port=MONGO_PORT) task_col = 'apscheduler_jobs' + # scheduler jobstore jobstores = { 'mongo': MongoDBJobStore(database=MONGO_DB, collection=task_col, client=mongo) } + # scheduler instance scheduler = BackgroundScheduler(jobstores=jobstores) - # scheduler = celery_scheduler - def execute_spider(self, id: str): r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, @@ -32,6 +31,7 @@ def execute_spider(self, id: str): def update(self): # remove all existing periodic jobs + self.scheduler.remove_all_jobs() self.mongo[MONGO_DB][self.task_col].remove() periodical_tasks = db_manager.list('schedules', {}) From 4b4c48649141b8eeec5c1fcbfb1afb733cd8f2ab Mon Sep 17 00:00:00 2001 From: casperwnb <842590178@qq.com> Date: Fri, 19 Apr 2019 11:35:14 +0800 Subject: [PATCH 04/12] modify config setting --- crawlab/config/__init__.py | 10 +++++++++ crawlab/{ => config}/config.py | 0 crawlab/config/config_local.py | 38 ++++++++++++++++++++++++++++++++++ crawlab/utils/node.py | 14 ++++++++++++- 4 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 crawlab/config/__init__.py rename crawlab/{ => config}/config.py (100%) create mode 100644 crawlab/config/config_local.py diff --git a/crawlab/config/__init__.py b/crawlab/config/__init__.py new file mode 100644 index 000000000..609b69de0 --- /dev/null +++ b/crawlab/config/__init__.py @@ -0,0 +1,10 @@ +# encoding: utf-8 + +import os + +run_env = os.environ.get("RUNENV", "local") + +if run_env == "local": # 加载本地配置 + from config.config_local import * +else: + from config.config import * diff --git a/crawlab/config.py b/crawlab/config/config.py similarity index 100% rename from crawlab/config.py rename to crawlab/config/config.py diff --git a/crawlab/config/config_local.py b/crawlab/config/config_local.py new file mode 100644 index 000000000..69d302773 --- /dev/null +++ b/crawlab/config/config_local.py @@ -0,0 +1,38 @@ +# encoding: utf-8 + +import os +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") + +# 配置python虚拟环境的路径 +PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' + +# 爬虫部署路径 +PROJECT_DEPLOY_FILE_FOLDER = os.path.join(BASE_DIR, 'deployfile') + +PROJECT_LOGS_FOLDER = os.path.join(BASE_DIR, 'deployfile/logs') +PROJECT_TMP_FOLDER = '/tmp' + +# celery variables +BROKER_URL = 'redis://127.0.0.1:56379/0' +CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:57017/' +CELERY_MONGODB_BACKEND_SETTINGS = { + 'database': 'crawlab_test', + 'taskmeta_collection': 'tasks_celery', +} +CELERY_TIMEZONE = 'Asia/Shanghai' +CELERY_ENABLE_UTC = True + +# flower variables +FLOWER_API_ENDPOINT = 'http://localhost:5555/api' + +# database variables +MONGO_HOST = '127.0.0.1' +MONGO_PORT = 57017 +MONGO_DB = 'crawlab_test' + +# flask variables +DEBUG = True +FLASK_HOST = '127.0.0.1' +FLASK_PORT = 8000 diff --git a/crawlab/utils/node.py b/crawlab/utils/node.py index c6cd47be4..6e40bc2bf 100644 --- a/crawlab/utils/node.py +++ b/crawlab/utils/node.py @@ -8,16 +8,27 @@ def check_nodes_status(): + """ + Update node status from Flower. + """ res = requests.get('%s/workers?status=1' % FLOWER_API_ENDPOINT) return json.loads(res.content.decode('utf-8')) def update_nodes_status(refresh=False): + """ + Update all nodes status + :param refresh: + """ online_node_ids = [] url = '%s/workers?status=1' % FLOWER_API_ENDPOINT if refresh: url += '&refresh=1' + res = requests.get(url) + if res.status_code != 200: + return online_node_ids + for k, v in json.loads(res.content.decode('utf-8')).items(): node_name = k node_status = NodeStatus.ONLINE if v else NodeStatus.OFFLINE @@ -26,9 +37,10 @@ def update_nodes_status(refresh=False): # new node if node is None: - node = {'_id': node_name, 'name': node_name, 'status': node_status} + node = {'_id': node_name, 'name': node_name, 'status': node_status, 'ip': 'localhost', 'port': '8000'} db_manager.save('nodes', node) + # existing node else: node['status'] = node_status db_manager.save('nodes', node) From 9a00be811e057a473d80e8867558d4f9a944d459 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Sat, 20 Apr 2019 20:04:09 +0800 Subject: [PATCH 05/12] added params for spider schedules --- crawlab/config.py | 3 ++- crawlab/routes/schedules.py | 3 ++- crawlab/routes/spiders.py | 9 ++++++++- crawlab/tasks/scheduler.py | 17 +++++++++++++---- crawlab/tasks/spider.py | 13 +++++++------ frontend/src/i18n/zh.js | 5 +++++ frontend/src/views/schedule/ScheduleList.vue | 17 +++++++++++++++++ 7 files changed, 54 insertions(+), 13 deletions(-) diff --git a/crawlab/config.py b/crawlab/config.py index d2d69f816..bad08ee24 100644 --- a/crawlab/config.py +++ b/crawlab/config.py @@ -5,7 +5,8 @@ PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' # 爬虫部署路径 -PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' # 爬虫日志路径 PROJECT_LOGS_FOLDER = '../deployfile/logs' diff --git a/crawlab/routes/schedules.py b/crawlab/routes/schedules.py index 532a4ec53..01db8be11 100644 --- a/crawlab/routes/schedules.py +++ b/crawlab/routes/schedules.py @@ -17,7 +17,8 @@ class ScheduleApi(BaseApi): ('name', str), ('description', str), ('cron', str), - ('spider_id', str) + ('spider_id', str), + ('params', str) ) def after_update(self, id: str = None): diff --git a/crawlab/routes/spiders.py b/crawlab/routes/spiders.py index f36903e3f..ba315ce9d 100644 --- a/crawlab/routes/spiders.py +++ b/crawlab/routes/spiders.py @@ -193,12 +193,19 @@ def on_crawl(self, id: str) -> (dict, tuple): :param id: spider_id :return: """ - job = execute_spider.delay(id) + args = self.parser.parse_args() + params = args.get('params') + + spider = db_manager.get('spiders', id=ObjectId(id)) + + job = execute_spider.delay(id, params) # create a new task db_manager.save('tasks', { '_id': job.id, 'spider_id': ObjectId(id), + 'cmd': spider.get('cmd'), + 'params': params, 'create_ts': datetime.utcnow(), 'status': TaskStatus.PENDING }) diff --git a/crawlab/tasks/scheduler.py b/crawlab/tasks/scheduler.py index bf29607f6..55e8fc36d 100644 --- a/crawlab/tasks/scheduler.py +++ b/crawlab/tasks/scheduler.py @@ -22,12 +22,15 @@ class Scheduler(object): # scheduler instance scheduler = BackgroundScheduler(jobstores=jobstores) - def execute_spider(self, id: str): + def execute_spider(self, id: str, params: str = None): + query = {} + if params is not None: + query['params'] = params r = requests.get('http://%s:%s/api/spiders/%s/on_crawl' % ( FLASK_HOST, FLASK_PORT, id - )) + ), query) def update(self): # remove all existing periodic jobs @@ -44,9 +47,15 @@ def update(self): day = cron_arr[3] month = cron_arr[4] day_of_week = cron_arr[5] - self.scheduler.add_job(func=self.execute_spider, trigger='cron', args=(str(task['spider_id']),), + self.scheduler.add_job(func=self.execute_spider, + args=(str(task['spider_id']), task.get('params'),), + trigger='cron', jobstore='mongo', - day_of_week=day_of_week, month=month, day=day, hour=hour, minute=minute, + day_of_week=day_of_week, + month=month, + day=day, + hour=hour, + minute=minute, second=second) def run(self): diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py index 3413a021c..c71c3f34d 100644 --- a/crawlab/tasks/spider.py +++ b/crawlab/tasks/spider.py @@ -11,7 +11,7 @@ @celery_app.task(bind=True) -def execute_spider(self, id: str): +def execute_spider(self, id: str, params: str = None): """ Execute spider task. :param self: @@ -23,6 +23,8 @@ def execute_spider(self, id: str): command = spider.get('cmd') if command.startswith("env"): command = PYTHON_ENV_PATH + command.replace("env", "") + if params is not None: + command += ' ' + params current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) @@ -43,7 +45,7 @@ def execute_spider(self, id: str): stdout = open(log_file_path, 'a') stderr = open(log_file_path, 'a') - # create a new task + # update task status as started db_manager.update_one('tasks', id=task_id, values={ 'start_ts': datetime.utcnow(), 'node_id': hostname, @@ -68,7 +70,9 @@ def execute_spider(self, id: str): env['CRAWLAB_COLLECTION'] = spider.get('col') # start process - p = subprocess.Popen(command.split(' '), + cmd_arr = command.split(' ') + cmd_arr = list(filter(lambda x: x != '', cmd_arr)) + p = subprocess.Popen(cmd_arr, stdout=stdout.fileno(), stderr=stderr.fileno(), cwd=current_working_directory, @@ -87,9 +91,6 @@ def execute_spider(self, id: str): # save task when the task is finished db_manager.update_one('tasks', id=task_id, values={ - 'node_id': hostname, - 'hostname': hostname, - 'log_file_path': log_file_path, 'finish_ts': datetime.utcnow(), 'status': status }) diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 7e127114b..a437dcf1c 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -111,6 +111,11 @@ export default { // 部署 'Time': '时间', + // 定时任务 + 'Schedule Name': '定时任务名称', + 'Schedule Description': '定时任务描述', + 'Parameters': '参数', + // 文件 'Choose Folder': '选择文件', diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 3b1e33070..7bcdcdf0b 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -31,6 +31,15 @@ + + + + + + @@ -130,6 +139,14 @@ export default { ]), filteredTableData () { return this.scheduleList + }, + spider () { + for (let i = 0; i < this.spiderList.length; i++) { + if (this.spiderList[i]._id === this.scheduleForm.spider_id) { + return this.spiderList[i] + } + } + return {} } }, methods: { From 017116bb29b8dd108cfd737bd0dd8c52fac6f572 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 22 Apr 2019 21:06:08 +0800 Subject: [PATCH 06/12] added results stats for tasks --- crawlab/config/config.py | 6 +++++- crawlab/config/config_local.py | 37 +++++++++++++++++++++++++--------- crawlab/routes/tasks.py | 25 +++++++++++++++++++++-- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/crawlab/config/config.py b/crawlab/config/config.py index bad08ee24..afbcb9bf8 100644 --- a/crawlab/config/config.py +++ b/crawlab/config/config.py @@ -1,5 +1,9 @@ +import os + +BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + # 爬虫源码路径 -PROJECT_SOURCE_FILE_FOLDER = '../spiders' +PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") # 配置python虚拟环境的路径 PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' diff --git a/crawlab/config/config_local.py b/crawlab/config/config_local.py index 69d302773..afbcb9bf8 100644 --- a/crawlab/config/config_local.py +++ b/crawlab/config/config_local.py @@ -1,38 +1,55 @@ -# encoding: utf-8 - import os + BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# 爬虫源码路径 PROJECT_SOURCE_FILE_FOLDER = os.path.join(BASE_DIR, "spiders") # 配置python虚拟环境的路径 PYTHON_ENV_PATH = '/Users/chennan/Desktop/2019/env/bin/python' # 爬虫部署路径 -PROJECT_DEPLOY_FILE_FOLDER = os.path.join(BASE_DIR, 'deployfile') +# PROJECT_DEPLOY_FILE_FOLDER = '../deployfile' +PROJECT_DEPLOY_FILE_FOLDER = '/var/crawlab' -PROJECT_LOGS_FOLDER = os.path.join(BASE_DIR, 'deployfile/logs') +# 爬虫日志路径 +PROJECT_LOGS_FOLDER = '../deployfile/logs' + +# 打包临时文件夹 PROJECT_TMP_FOLDER = '/tmp' -# celery variables -BROKER_URL = 'redis://127.0.0.1:56379/0' -CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:57017/' +# Celery中间者URL +BROKER_URL = 'redis://127.0.0.1:6379/0' + +# Celery后台URL +CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017/' + +# Celery MongoDB设置 CELERY_MONGODB_BACKEND_SETTINGS = { 'database': 'crawlab_test', 'taskmeta_collection': 'tasks_celery', } + +# Celery时区 CELERY_TIMEZONE = 'Asia/Shanghai' + +# 是否启用UTC CELERY_ENABLE_UTC = True +# Celery Scheduler Redis URL +CELERY_BEAT_SCHEDULER = 'utils.redisbeat.RedisScheduler' +CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379' +CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks' + # flower variables FLOWER_API_ENDPOINT = 'http://localhost:5555/api' -# database variables +# MongoDB 变量 MONGO_HOST = '127.0.0.1' -MONGO_PORT = 57017 +MONGO_PORT = 27017 MONGO_DB = 'crawlab_test' -# flask variables +# Flask 变量 DEBUG = True FLASK_HOST = '127.0.0.1' FLASK_PORT = 8000 diff --git a/crawlab/routes/tasks.py b/crawlab/routes/tasks.py index 86e75dabb..2afb0cf96 100644 --- a/crawlab/routes/tasks.py +++ b/crawlab/routes/tasks.py @@ -42,9 +42,21 @@ def get(self, id: str = None, action: str = None): elif id is not None: task = db_manager.get(col_name=self.col_name, id=id) spider = db_manager.get(col_name='spiders', id=str(task['spider_id'])) - task['spider_name'] = spider['name'] + + # spider + task['num_results'] = 0 + if spider: + task['spider_name'] = spider['name'] + if spider.get('col'): + col = spider.get('col') + num_results = db_manager.count(col, {'task_id': task['_id']}) + task['num_results'] = num_results + + # duration if task.get('finish_ts') is not None: task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) + try: with open(task['log_file_path']) as f: task['log'] = f.read() @@ -76,13 +88,22 @@ def get(self, id: str = None, action: str = None): if task.get('status') is None: task['status'] = TaskStatus.UNAVAILABLE - # spider name + # spider + task['num_results'] = 0 if _spider: + # spider name task['spider_name'] = _spider['name'] + # number of results + if _spider.get('col'): + col = _spider.get('col') + num_results = db_manager.count(col, {'task_id': task['_id']}) + task['num_results'] = num_results + # duration if task.get('finish_ts') is not None: task['duration'] = (task['finish_ts'] - task['create_ts']).total_seconds() + task['avg_num_results'] = round(task['num_results'] / task['duration'], 1) items.append(task) From 4dff16e9ec93e000f26cf7253b68f4abd67e8880 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 22 Apr 2019 21:26:58 +0800 Subject: [PATCH 07/12] added filter for TaskList --- frontend/src/components/InfoView/TaskInfoView.vue | 7 +++++++ frontend/src/i18n/zh.js | 2 ++ frontend/src/views/task/TaskList.vue | 10 ++++++---- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/frontend/src/components/InfoView/TaskInfoView.vue b/frontend/src/components/InfoView/TaskInfoView.vue index 8b6fdd16a..92ad58690 100644 --- a/frontend/src/components/InfoView/TaskInfoView.vue +++ b/frontend/src/components/InfoView/TaskInfoView.vue @@ -30,6 +30,13 @@ + + + + + + +
{{taskForm.log}} diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index a437dcf1c..69147f672 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -101,6 +101,8 @@ export default { 'Finish Timestamp': '完成时间', 'Duration (sec)': '用时(秒)', 'Error Message': '错误信息', + 'Results Count': '结果数', + 'Average Results Count per Second': '抓取速度(个/秒)', // 任务列表 'Node': '节点', diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index fe40fe5c7..206873346 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -106,12 +106,14 @@ export default { dialogVisible: false, // tableData, columns: [ - { name: 'create_ts', label: 'Create Time', width: '150' }, - { name: 'start_ts', label: 'Start Time', width: '150' }, - { name: 'finish_ts', label: 'Finish Time', width: '150' }, + { name: 'create_ts', label: 'Create Time', width: '100' }, + { name: 'start_ts', label: 'Start Time', width: '100' }, + { name: 'finish_ts', label: 'Finish Time', width: '100' }, { name: 'duration', label: 'Duration (sec)', width: '80' }, - { name: 'spider_name', label: 'Spider', width: '160' }, + { name: 'spider_name', label: 'Spider', width: '120' }, { name: 'node_id', label: 'Node', width: '160' }, + { name: 'num_results', label: 'Results Count', width: '80' }, + { name: 'avg_num_results', label: 'Average Results Count per Second', width: '80' }, { name: 'status', label: 'Status', width: '80' } ] } From db1cd7ec9b57ec36e412de4ce1717b35cfe57a2a Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 23 Apr 2019 19:33:50 +0800 Subject: [PATCH 08/12] fix issue #12 --- crawlab/db/manager.py | 4 ++++ crawlab/tasks/spider.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/crawlab/db/manager.py b/crawlab/db/manager.py index d210b81c4..c1c6d88ab 100644 --- a/crawlab/db/manager.py +++ b/crawlab/db/manager.py @@ -175,5 +175,9 @@ def aggregate(self, col_name: str, pipelines, **kwargs): col = self.db[col_name] return col.aggregate(pipelines, **kwargs) + def create_index(self, col_name: str, keys: dict, **kwargs): + col = self.db[col_name] + col.create_index(keys=keys, **kwargs) + db_manager = DbManager() diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py index c71c3f34d..8a8449684 100644 --- a/crawlab/tasks/spider.py +++ b/crawlab/tasks/spider.py @@ -1,5 +1,6 @@ import os from datetime import datetime +from time import sleep from bson import ObjectId from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, PYTHON_ENV_PATH @@ -10,6 +11,17 @@ from utils.log import other as logger +def get_task(id: str): + i = 0 + while i < 5: + task = db_manager.get('tasks', id=id) + if task is not None: + return task + i += 1 + sleep(1) + return None + + @celery_app.task(bind=True) def execute_spider(self, id: str, params: str = None): """ @@ -26,6 +38,12 @@ def execute_spider(self, id: str, params: str = None): if params is not None: command += ' ' + params + # get task object and return if not found + task = get_task(task_id) + if task is None: + return + + # current working directory current_working_directory = os.path.join(PROJECT_DEPLOY_FILE_FOLDER, str(spider.get('_id'))) # log info @@ -69,6 +87,9 @@ def execute_spider(self, id: str, params: str = None): if spider.get('col'): env['CRAWLAB_COLLECTION'] = spider.get('col') + # create index to speed results data retrieval + db_manager.create_index(spider.get('col'), {'task_id': 1}) + # start process cmd_arr = command.split(' ') cmd_arr = list(filter(lambda x: x != '', cmd_arr)) From 4726f6ca4d0304a021e281702405da26c61063a5 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 23 Apr 2019 19:40:33 +0800 Subject: [PATCH 09/12] fix issue #12 --- crawlab/tasks/spider.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py index 8a8449684..ad47b6551 100644 --- a/crawlab/tasks/spider.py +++ b/crawlab/tasks/spider.py @@ -3,6 +3,8 @@ from time import sleep from bson import ObjectId +from pymongo import ASCENDING + from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, PYTHON_ENV_PATH from constants.task import TaskStatus from db.manager import db_manager @@ -88,7 +90,7 @@ def execute_spider(self, id: str, params: str = None): env['CRAWLAB_COLLECTION'] = spider.get('col') # create index to speed results data retrieval - db_manager.create_index(spider.get('col'), {'task_id': 1}) + db_manager.create_index(spider.get('col'), [('task_id', ASCENDING)]) # start process cmd_arr = command.split(' ') From db0f58f9b9bf6360892d3da6298ff8b22bd710d6 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 23 Apr 2019 19:52:06 +0800 Subject: [PATCH 10/12] request task list every 5 seconds --- frontend/src/views/task/TaskList.vue | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 206873346..c8daa4c3b 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -102,9 +102,16 @@ export default { name: 'TaskList', data () { return { + // setInterval handle + handle: undefined, + + // determine if is edit mode isEditMode: false, + + // dialog visibility dialogVisible: false, - // tableData, + + // table columns columns: [ { name: 'create_ts', label: 'Create Time', width: '100' }, { name: 'start_ts', label: 'Start Time', width: '100' }, @@ -211,6 +218,15 @@ export default { this.$store.dispatch('task/getTaskList') this.$store.dispatch('spider/getSpiderList') this.$store.dispatch('node/getNodeList') + }, + mounted () { + // request task list every 5 seconds + this.handle = setInterval(() => { + this.$store.dispatch('task/getTaskList') + }, 5000) + }, + destroyed () { + clearInterval(this.handle) } } From b7ed4ad442417a96c59fe4975ae3d221a05c51f0 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 23 Apr 2019 20:25:15 +0800 Subject: [PATCH 11/12] added stats for spider --- crawlab/db/manager.py | 2 +- crawlab/routes/spiders.py | 11 ++++++++-- crawlab/tasks/spider.py | 2 +- crawlab/utils/spider.py | 26 ++++++++++++++++++++++++ frontend/src/i18n/zh.js | 3 +++ frontend/src/views/spider/SpiderList.vue | 17 ++++++++++++++-- 6 files changed, 55 insertions(+), 6 deletions(-) diff --git a/crawlab/db/manager.py b/crawlab/db/manager.py index c1c6d88ab..71ea9b2b0 100644 --- a/crawlab/db/manager.py +++ b/crawlab/db/manager.py @@ -28,7 +28,7 @@ def save(self, col_name: str, item: dict, **kwargs) -> None: if item.get('stats') is not None: item.pop('stats') - col.save(item, **kwargs) + return col.save(item, **kwargs) def remove(self, col_name: str, cond: dict, **kwargs) -> None: """ diff --git a/crawlab/routes/spiders.py b/crawlab/routes/spiders.py index ba315ce9d..157218ee5 100644 --- a/crawlab/routes/spiders.py +++ b/crawlab/routes/spiders.py @@ -21,7 +21,7 @@ from utils import jsonify from utils.deploy import zip_file, unzip_file from utils.file import get_file_suffix_stats, get_file_suffix -from utils.spider import get_lang_by_stats +from utils.spider import get_lang_by_stats, get_last_n_run_errors_count, get_last_n_day_tasks_count parser = reqparse.RequestParser() parser.add_argument('file', type=FileStorage, location='files') @@ -106,7 +106,7 @@ def get(self, id=None, action=None): if spider is None: stats = get_file_suffix_stats(dir_path) lang = get_lang_by_stats(stats) - db_manager.save('spiders', { + spider = db_manager.save('spiders', { 'name': dir_name, 'src': dir_path, 'lang': lang, @@ -137,6 +137,13 @@ def get(self, id=None, action=None): 'suffix_stats': stats, }) + # --------- + # stats + # --------- + # last 5-run errors + spider['last_5_errors'] = get_last_n_run_errors_count(spider_id=spider['_id'], n=5) + spider['last_7d_tasks'] = get_last_n_day_tasks_count(spider_id=spider['_id'], n=5) + # append spider items.append(spider) diff --git a/crawlab/tasks/spider.py b/crawlab/tasks/spider.py index ad47b6551..0d843e222 100644 --- a/crawlab/tasks/spider.py +++ b/crawlab/tasks/spider.py @@ -3,7 +3,7 @@ from time import sleep from bson import ObjectId -from pymongo import ASCENDING +from pymongo import ASCENDING, DESCENDING from config import PROJECT_DEPLOY_FILE_FOLDER, PROJECT_LOGS_FOLDER, PYTHON_ENV_PATH from constants.task import TaskStatus diff --git a/crawlab/utils/spider.py b/crawlab/utils/spider.py index 0a45d28f8..6f7d4ef67 100644 --- a/crawlab/utils/spider.py +++ b/crawlab/utils/spider.py @@ -1,6 +1,10 @@ import os +from datetime import datetime, timedelta + +from bson import ObjectId from constants.spider import FILE_SUFFIX_LANG_MAPPING, LangType, SUFFIX_IGNORE, SpiderType +from constants.task import TaskStatus from db.manager import db_manager @@ -43,3 +47,25 @@ def get_spider_col_fields(col_name: str) -> list: for k in item.keys(): fields.add(k) return list(fields) + + +def get_last_n_run_errors_count(spider_id: ObjectId, n: int) -> list: + tasks = db_manager.list(col_name='tasks', + cond={'spider_id': spider_id}, + sort_key='create_ts', + limit=n) + count = 0 + for task in tasks: + if task['status'] == TaskStatus.FAILURE: + count += 1 + return count + + +def get_last_n_day_tasks_count(spider_id: ObjectId, n: int) -> list: + return db_manager.count(col_name='tasks', + cond={ + 'spider_id': spider_id, + 'create_ts': { + '$gte': (datetime.now() - timedelta(n)) + } + }) diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 69147f672..13baddc7c 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -86,6 +86,8 @@ export default { 'Variable': '变量', 'Value': '值', 'Add Environment Variables': '添加环境变量', + 'Last 7-Day Tasks': '最近7天任务数', + 'Last 5-Run Errors': '最近5次运行错误数', // 爬虫列表 'Name': '名称', @@ -117,6 +119,7 @@ export default { 'Schedule Name': '定时任务名称', 'Schedule Description': '定时任务描述', 'Parameters': '参数', + 'Add Schedule': '添加定时任务', // 文件 'Choose Folder': '选择文件', diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index 60785efe4..14a9ffea7 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -84,6 +84,17 @@ {{scope.row.lang}} + + + - +