Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report service fixes #80

Open
wants to merge 1 commit into
base: release-7.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions helmcharts/edbb/charts/report/configs/opa/policies.rego
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import input.attributes.request.http as http_request

x_authenticated_user_token := http_request.headers["x-authenticated-user-token"]

api_key := trim_prefix(http_request.headers.authorization, "Bearer ")

urls_to_action_mapping := {
"/report/get": "getReport",
"/report/list": "listReports",
Expand Down Expand Up @@ -51,13 +53,23 @@ createReport {
super.is_an_internal_request
}

createReport {
[_, payload, _] := io.jwt.decode(api_key)
payload.iss == "api_admin"
}

deleteReport {
acls := ["deleteReport"]
roles := ["REPORT_ADMIN", "ORG_ADMIN"]
super.acls_check(acls)
super.role_check(roles)
}

updateReport {
[_, payload, _] := io.jwt.decode(api_key)
payload.iss == "api_admin"
}

updateReport {
acls := ["updateReport"]
roles := ["REPORT_ADMIN", "ORG_ADMIN"]
Expand Down
5 changes: 4 additions & 1 deletion helmcharts/obsrvbb/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ dependencies:
condition: spark.enabled
- name: postgresql
condition: postgresql.enabled
version: 10.16.3
version: 10.16.3
- name: superset
version: 0.1.0
condition: superset.enabled
44 changes: 44 additions & 0 deletions helmcharts/obsrvbb/charts/cronjob/templates/spark-cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,47 @@ spec:
restartPolicy: OnFailure
serviceAccountName: spark-cronjob-sa


---
apiVersion: batch/v1
kind: CronJob
metadata:
name: druid-report-processor
labels:
app: druid-report-processor
spec:
schedule: "30 3 * * *"
concurrencyPolicy: Allow
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
jobTemplate:
spec:
template:
spec:
containers:
- name: druid-report-processor
image: bitnami/kubectl:1.28.4
imagePullPolicy: IfNotPresent
command:
- /bin/sh
- -c
- |
SPARK_POD=spark-master-0
kubectl exec -it $SPARK_POD -- bash -c "python3 /data/analytics/scripts/druid-report-processor.py"
env:
- name: SPARK_HOME
value: "/data/analytics/spark-3.1.3-bin-hadoop2.7"
- name: MODELS_HOME
value: "/data/analytics/models-2.0"
- name: DP_LOGS
value: "/data/analytics/logs/data-products"
- name: KAFKA_HOME
value: "/opt/bitnami/kafka_2.12-2.8.0"
- name: KAFKA_TOPIC
value: "{{ .Values.global.kafka.topics.analytics_job_queue }}"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: KAFKA_CONSUMER_GROUP
value: "druid-report-processor-group"
restartPolicy: OnFailure
serviceAccountName: spark-cronjob-sa
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python3

import json
import os
import subprocess
import time
from kafka import KafkaConsumer
from datetime import datetime
import logging


os.environ['SPARK_HOME'] = '/data/analytics/spark-3.1.3-bin-hadoop2.7'
os.environ['MODELS_HOME'] = '/data/analytics/models-2.0'
os.environ['KAFKA_HOME'] = '/opt/bitnami/kafka_2.12-2.8.0'
os.environ['KAFKA_TOPIC'] = '{}.druid.report.job_queue'.format(os.getenv('ENV', 'dev'))
os.environ['KAFKA_BOOTSTRAP_SERVERS'] = 'kafka:9092'
os.environ['DP_LOGS'] = '/data/analytics/logs/data-products'

print('Starting Druid report processor...')
print('environment variables', os.environ)

# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(os.getenv('DP_LOGS', ''), 'druid_processor.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)



KAFKA_TOPIC = os.getenv('KAFKA_TOPIC')
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS')
SPARK_HOME = os.getenv('SPARK_HOME')
MODELS_HOME = os.getenv('MODELS_HOME')
KAFKA_HOME = os.getenv('KAFKA_HOME')


def get_kafka_consumer():
"""Initialize and return Kafka consumer"""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=30000, # 30 seconds timeout
group_id=os.getenv('KAFKA_CONSUMER_GROUP', 'druid-report-processor-group')
)

def submit_spark_job(config_json):
"""Submit Spark job and wait for completion"""
cmd = [
f"{SPARK_HOME}/bin/spark-submit",
"--master", "local[*]",
"--class", "org.ekstep.analytics.job.JobExecutor",
"--jars", f"/data/analytics/models-2.0/lib/*,/data/analytics/models-2.0/analytics-framework-2.0.jar,/data/analytics/models-2.0/scruid_2.12-2.5.0.jar,/data/analytics/models-2.0/batch-models-2.0.jar",
f"/data/analytics/models-2.0/batch-models-2.0.jar",
"--model", "druid_reports",
"--config", json.dumps(config_json)
]

logger.info(f"Submitting Spark job with config: {config_json}")

try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)

stdout, stderr = process.communicate()

if process.returncode == 0:
logger.info("Spark job completed successfully")
else:
logger.error(f"Spark job failed with return code {process.returncode}")
logger.error(f"stderr: {stderr}")

return process.returncode == 0
except Exception as e:
logger.error(f"Error submitting Spark job: {str(e)}")
return False

def process_events():
"""Main function to process Kafka events"""
consumer = get_kafka_consumer()
logger.info("Started consuming from Kafka topic")

try:
message_processed = False
for message in consumer:
message_processed = True
logger.info(f"Received message: {message.value}")
event = message.value

# Log the model type for debugging
model_type = event.get('model')
logger.info(f"Event model type: {model_type}")

# Check if the event is for druid_reports
if model_type == 'druid_reports':
logger.info("Received druid_reports event")

# Extract config from event and submit the Spark job
config = event.get('config')
if config:
success = submit_spark_job(config)

if success:
consumer.commit() # Commit offset only if job succeeds
logger.info("Successfully processed event and committed offset")
else:
logger.error("Failed to process event, will retry on next run")
break
else:
logger.error("Event missing config field")
consumer.commit() # Commit invalid events to avoid getting stuck
else:
logger.info(f"Skipping event with model type: {model_type}")
consumer.commit() # Commit offset for non-matching events

if not message_processed:
logger.info("No messages available to process. Exiting.")
except Exception as e:
logger.error(f"Error processing events: {str(e)}")
logger.exception("Full traceback:")
finally:
consumer.close()

if __name__ == "__main__":
logger.info("Starting Druid Processor Service")
process_events()
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python3

"""
Script to submit druid report jobs.
"""

import os
import sys
import json
import requests
import argparse
from datetime import date
from kafka import KafkaProducer
from kafka.errors import KafkaError

# Common config functions
def init_config():
"""Initialize common configuration"""
return {
"kafka_metrics_topic": "telemetry.metrics",
"kafka_job_queue": "{}.druid.report.job_queue"
}

# Kafka utility functions
def get_producer(broker_host):
"""Get Kafka producer instance"""
return KafkaProducer(bootstrap_servers=[broker_host])

def send(broker_host, topic, msg):
"""Send message to Kafka topic"""
producer = get_producer(broker_host)
result = producer.send(topic, msg.encode('utf-8'))
try:
record_metadata = result.get(timeout=10)
except KafkaError:
print('Error sending message to Kafka')
pass

def push_metrics(broker_host, topic, metric):
"""Push metrics to Kafka"""
jd = json.dumps(metric)
send(broker_host, topic, jd)
pass

def get_config():
"""Initialize configuration"""
env = os.getenv("ENV", "dev")
config = init_config()
kafka_broker = os.getenv("KAFKA_BROKER_HOST", "localhost:9092")
kafka_topic = config["kafka_job_queue"].format(env)
return env, kafka_broker, kafka_topic

def get_active_jobs(report_search_base_url, auth_token):
"""Fetch active jobs from the API"""
url = "{}report/jobs".format(report_search_base_url)
payload = """{"request": {"filters": {"status": ["ACTIVE"]}}}"""
headers = {
'content-type': "application/json; charset=utf-8",
'cache-control': "no-cache",
'Authorization': "Bearer " + auth_token
}
response = requests.request("POST", url, data=payload, headers=headers)
print('Active report configurations fetched from the API')
return response.json()['result']['reports']

def deactivate_job(report_search_base_url, auth_token, report_id):
"""Deactivate a specific job"""
url = ("{}report/jobs/deactivate/"+report_id).format(report_search_base_url)
headers = {
'cache-control': "no-cache",
'Authorization': "Bearer " + auth_token
}
response = requests.request("POST", url, headers=headers)
return response

def interpolate_config(report_config, replace_list):
"""Interpolate configuration with replacement values"""
report_config_str = json.dumps(report_config)
for item in replace_list:
report_config_str = report_config_str.replace(item["key"], item["value"])
print('String interpolation for the report config completed')
return report_config_str

def check_schedule(report_schedule, report_id, interval_slider, deactivate_func):
"""Check if the report should be scheduled based on its configuration"""
if report_schedule == 'DAILY':
return True
elif report_schedule == 'WEEKLY':
interval_slider = int(interval_slider) if interval_slider is not None else 0
if interval_slider < 7 and interval_slider >= 0 and date.today().weekday() == interval_slider:
return True
elif report_schedule == 'MONTHLY':
interval_slider = int(interval_slider) + 1 if interval_slider is not None else 1
if interval_slider < 21 and interval_slider > 0 and date.today().day == interval_slider:
return True
elif report_schedule == 'ONCE':
deactivate_func(report_id)
return True
else:
return False

def submit_job(report_config, kafka_broker, kafka_topic):
"""Submit job to Kafka"""
report_config = json.loads(report_config)
report_id = report_config['reportConfig']['id']
submit_config = json.loads("""{"model":"druid_reports", "config":{"search":{"type":"none"},"model":"org.ekstep.analytics.model.DruidQueryProcessingModel","output":[{"to":"console","params":{"printEvent":false}}],"parallelization":8,"appName":"Druid Query Processor","deviceMapping":false}}""")
submit_config['config']['modelParams'] = report_config
submit_config['config']['modelParams']['modelName'] = report_id + "_job"
send(kafka_broker, kafka_topic, json.dumps(submit_config))
print('Job submitted to the job manager with config - ', submit_config)
return

def process_druid_reports(report_search_base_url, auth_token, replace_list="""[{"key":"__store__","value":"azure"},{"key":"__container__","value":"reports"}]"""):
"""Main function to process and submit druid reports"""
print('Starting the job submitter...')

# Get configuration
env, kafka_broker, kafka_topic = get_config()
replace_list = json.loads(replace_list)

# Get active jobs
reports = get_active_jobs(report_search_base_url, auth_token)

# Process each report
for report in reports:
try:
deactivate_func = lambda report_id: deactivate_job(report_search_base_url, auth_token, report_id)
if check_schedule(
report['reportSchedule'].upper(),
report['reportId'],
report['config']['reportConfig']['dateRange'].get('intervalSlider'),
deactivate_func
):
report_config = interpolate_config(report['config'], replace_list)
submit_job(report_config, kafka_broker, kafka_topic)
except Exception as e:
print('ERROR::While submitting druid report', report['reportId'])

print('Job submission completed...')

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Submit Druid report jobs')
parser.add_argument('--report_list_jobs_url', required=True, help='Base URL for report jobs API')
parser.add_argument('--auth_token', required=True, help='Authentication token')
parser.add_argument('--replace_list', default="""[{"key":"__store__","value":"azure"},{"key":"__container__","value":"reports"}]""",
help='JSON string for replacement configurations')

args = parser.parse_args()
process_druid_reports(args.report_list_jobs_url, args.auth_token, args.replace_list)

"""
Example usage:

1. Direct Python call:
python druid_report_submitter.py --report_list_jobs_url "https://<domain>/api/data/v1/" --auth_token "<token>"

2. Using bash command (existing usage):
source /data/analytics/venv/bin/activate && python druid_report_submitter.py --report_list_jobs_url "https://<domain>/api/data/v1/" --auth_token "<token>"

"""
Loading