Skip to content

Commit

Permalink
Merge pull request #139 from stelin/main
Browse files Browse the repository at this point in the history
Update status
  • Loading branch information
stelin authored Jul 19, 2023
2 parents 27599a1 + 659c260 commit 43436b0
Show file tree
Hide file tree
Showing 28 changed files with 278 additions and 31 deletions.
2 changes: 1 addition & 1 deletion openjob-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.5</version>
<version>1.0.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public Thread newThread(@Nonnull Runnable r) {
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
consumerExecutor.allowCoreThreadTimeOut(true);

this.pullExecutor = new ThreadPoolExecutor(
1,
Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.5</version>
<version>1.0.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-server-admin</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.5</version>
<version>1.0.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.openjob.common.request.WorkerDelayStatusRequest;
import io.openjob.common.response.Result;
import io.openjob.common.response.ServerResponse;
import io.openjob.server.cluster.service.WorkerDelayService;
import io.openjob.server.scheduler.service.DelayInstanceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -20,11 +21,11 @@
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class WorkerDelayInstanceStatusActor extends BaseActor {

private final DelayInstanceService delayInstanceService;
private final WorkerDelayService workerDelayService;

@Autowired
public WorkerDelayInstanceStatusActor(DelayInstanceService delayInstanceService) {
this.delayInstanceService = delayInstanceService;
public WorkerDelayInstanceStatusActor(WorkerDelayService workerDelayService) {
this.workerDelayService = workerDelayService;
}

@Override
Expand All @@ -40,7 +41,7 @@ public Receive createReceive() {
* @param statusRequest statusRequest
*/
public void handleDelayStatus(WorkerDelayStatusRequest statusRequest) {
this.delayInstanceService.handleDelayStatus(statusRequest);
this.workerDelayService.handleDelayStatus(statusRequest);

ServerResponse serverResponse = new ServerResponse(statusRequest.getDeliveryId());
getSender().tell(Result.success(serverResponse), getSelf());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.openjob.server.cluster.executor;

import io.openjob.common.request.WorkerDelayStatusRequest;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.task.WorkerDelayStatusConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author stelin [email protected]
* @since 1.0.6
*/
@Slf4j
@Component
public class WorkerDelayStatusExecutor {
private final TaskQueue<WorkerDelayStatusRequest> queue;

/**
* New
*/
public WorkerDelayStatusExecutor() {
this.queue = new TaskQueue<>(0L, 1024);

//Consumer
WorkerDelayStatusConsumer consumer = new WorkerDelayStatusConsumer(
0L,
1,
16,
"Openjob-heartbeat-executor",
50,
"Openjob-heartbeat-consumer",
this.queue
);
consumer.start();
}

/**
* Submit request
*
* @param request request
*/
public void submit(WorkerDelayStatusRequest request) {
try {
this.queue.submit(request);
} catch (InterruptedException e) {
log.error("Worker heartbeat submit failed!", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import io.openjob.common.request.WorkerHeartbeatRequest;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.service.WorkerHeartbeatService;
import io.openjob.server.cluster.task.WorkerHeartConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
Expand All @@ -27,7 +25,7 @@ public WorkerHeartbeatExecutor() {
WorkerHeartConsumer consumer = new WorkerHeartConsumer(
0L,
1,
8,
16,
"Openjob-heartbeat-executor",
50,
"Openjob-heartbeat-consumer",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.openjob.server.cluster.executor;

import io.openjob.common.request.WorkerJobInstanceStatusRequest;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.task.WorkerJobInstanceConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author stelin [email protected]
* @since 1.0.6
*/
@Slf4j
@Component
public class WorkerJobInstanceExecutor {
private final TaskQueue<WorkerJobInstanceStatusRequest> queue;

/**
* New
*/
public WorkerJobInstanceExecutor() {
this.queue = new TaskQueue<>(0L, 1024);

//Consumer
WorkerJobInstanceConsumer consumer = new WorkerJobInstanceConsumer(
0L,
1,
32,
"Openjob-heartbeat-executor",
50,
"Openjob-heartbeat-consumer",
this.queue
);
consumer.start();
}

/**
* Submit request
*
* @param request request
*/
public void submit(WorkerJobInstanceStatusRequest request) {
try {
this.queue.submit(request);
} catch (InterruptedException e) {
log.error("Worker heartbeat submit failed!", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public WorkerTaskLogExecutor() {
WorkerTaskLogConsumer consumer = new WorkerTaskLogConsumer(
0L,
1,
16,
32,
"Openjob-log-executor",
50,
"Openjob-log-consumer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.openjob.common.request.WorkerJobInstanceLogRequest;
import io.openjob.common.request.WorkerJobInstanceStatusRequest;
import io.openjob.common.util.DateUtil;
import io.openjob.server.cluster.executor.WorkerJobInstanceExecutor;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dao.JobInstanceLogDAO;
import io.openjob.server.repository.dao.JobInstanceTaskDAO;
Expand All @@ -28,16 +29,24 @@
@Log4j2
public class JobInstanceService {
private final JobInstanceTaskDAO jobInstanceTaskDAO;

private final JobInstanceLogDAO jobInstanceLogDAO;

private final JobInstanceDAO jobInstanceDAO;
private final WorkerJobInstanceExecutor workerJobInstanceExecutor;

@Autowired
public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, JobInstanceLogDAO jobInstanceLogDAO, JobInstanceDAO jobInstanceDAO) {
public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO,
JobInstanceLogDAO jobInstanceLogDAO,
JobInstanceDAO jobInstanceDAO,
WorkerJobInstanceExecutor workerJobInstanceExecutor) {
this.jobInstanceTaskDAO = jobInstanceTaskDAO;
this.jobInstanceLogDAO = jobInstanceLogDAO;
this.jobInstanceDAO = jobInstanceDAO;
this.workerJobInstanceExecutor = workerJobInstanceExecutor;
}

@Transactional(rollbackFor = Exception.class, timeout = 1)
public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) {
this.workerJobInstanceExecutor.submit(statusRequest);
}

/**
Expand All @@ -46,7 +55,7 @@ public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, JobInstanceLogD
* @param statusRequest status request.
*/
@Transactional(rollbackFor = Exception.class)
public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) {
public void handleConsumerInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) {
// First page to update job instance status.
if (CommonConstant.FIRST_PAGE.equals(statusRequest.getPage())) {
this.jobInstanceDAO.updateStatusById(statusRequest.getJobInstanceId(), statusRequest.getStatus());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.openjob.server.cluster.service;

import io.openjob.common.request.WorkerDelayStatusRequest;
import io.openjob.server.cluster.executor.WorkerDelayStatusExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author stelin [email protected]
* @since 1.0.6
*/
@Service
public class WorkerDelayService {

private final WorkerDelayStatusExecutor workerDelayStatusExecutor;

@Autowired
public WorkerDelayService(WorkerDelayStatusExecutor workerDelayStatusExecutor) {
this.workerDelayStatusExecutor = workerDelayStatusExecutor;
}

/**
* Handle delay status.
*
* @param workerDelayStatusRequest workerDelayStatusRequest
*/
public void handleDelayStatus(WorkerDelayStatusRequest workerDelayStatusRequest) {
this.workerDelayStatusExecutor.submit(workerDelayStatusRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.openjob.server.cluster.task;

import io.openjob.common.OpenjobSpringContext;
import io.openjob.common.request.WorkerDelayStatusRequest;
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.scheduler.service.DelayInstanceService;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
* @author stelin [email protected]
* @since 1.0.6
*/
@Slf4j
public class WorkerDelayStatusConsumer extends BaseConsumer<WorkerDelayStatusRequest> {
public WorkerDelayStatusConsumer(Long id,
Integer consumerCoreThreadNum,
Integer consumerMaxThreadNum,
String consumerThreadName,
Integer pollSize,
String pollThreadName,
TaskQueue<WorkerDelayStatusRequest> queues) {
super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 2000L, 1000L);
}

@Override
public void consume(Long id, List<WorkerDelayStatusRequest> tasks) {
this.consumerExecutor.submit(new WorkerDelayStatusRunnable(tasks));
}

private static class WorkerDelayStatusRunnable implements Runnable {
private final List<WorkerDelayStatusRequest> tasks;

private WorkerDelayStatusRunnable(List<WorkerDelayStatusRequest> tasks) {
this.tasks = tasks;
}

@Override
public void run() {
try {
this.tasks.forEach(r -> OpenjobSpringContext.getBean(DelayInstanceService.class).handleConsumerDelayStatus(r));
} catch (Throwable throwable) {
log.error("Worker delay status consume failed!", throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.service.WorkerHeartbeatService;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
* @author stelin [email protected]
* @since 1.0.3
*/
@Slf4j
public class WorkerHeartConsumer extends BaseConsumer<WorkerHeartbeatRequest> {

public WorkerHeartConsumer(Long id,
Expand Down Expand Up @@ -41,7 +43,11 @@ private WorkerHeartbeatConsumerRunnable(List<WorkerHeartbeatRequest> tasks) {

@Override
public void run() {
OpenjobSpringContext.getBean(WorkerHeartbeatService.class).batchHeartbeat(this.tasks);
try {
OpenjobSpringContext.getBean(WorkerHeartbeatService.class).batchHeartbeat(this.tasks);
} catch (Throwable throwable) {
log.error("Worker heartbeat failed!", throwable);
}
}
}
}
Loading

0 comments on commit 43436b0

Please sign in to comment.