assignment service + executor pool improvments
This commit is contained in:
@@ -56,6 +56,12 @@
|
||||
<artifactId>javax.transaction-api</artifactId>
|
||||
<version>1.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.json</groupId>
|
||||
<artifactId>json</artifactId>
|
||||
<version>20210307</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@@ -21,6 +21,7 @@ public class SelfValidating<T> {
|
||||
* instance.
|
||||
*/
|
||||
protected void validateSelf() {
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<ConstraintViolation<T>> violations = validator.validate((T) this);
|
||||
if (!violations.isEmpty()) {
|
||||
throw new ConstraintViolationException(violations);
|
||||
|
@@ -21,7 +21,7 @@ public class TaskAvailableController {
|
||||
|
||||
@GetMapping(path = "/newtask/{taskType}")
|
||||
public ResponseEntity<String> retrieveTaskFromTaskList(@PathVariable("taskType") String taskType) {
|
||||
|
||||
|
||||
if (ExecutorType.contains(taskType.toUpperCase())) {
|
||||
TaskAvailableCommand command = new TaskAvailableCommand(
|
||||
ExecutorType.valueOf(taskType.toUpperCase()));
|
||||
|
@@ -8,6 +8,9 @@ import java.net.http.HttpResponse;
|
||||
import java.util.HashMap;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.json.JSONObject;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
import ch.unisg.executorBase.executor.application.port.out.ExecutionFinishedEventPort;
|
||||
@@ -20,39 +23,30 @@ public class ExecutionFinishedEventAdapter implements ExecutionFinishedEventPort
|
||||
|
||||
@Override
|
||||
public void publishExecutionFinishedEvent(ExecutionFinishedEvent event) {
|
||||
///Here we would need to work with DTOs in case the payload of calls becomes more complex
|
||||
|
||||
var values = new HashMap<String, String>() {{
|
||||
put("result",event.getResult());
|
||||
put("status",event.getStatus());
|
||||
}};
|
||||
|
||||
var objectMapper = new ObjectMapper();
|
||||
String requestBody = null;
|
||||
try {
|
||||
requestBody = objectMapper.writeValueAsString(values);
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
String body = new JSONObject()
|
||||
.put("taskID", event.getTaskID())
|
||||
.put("result", event.getResult())
|
||||
.put("status", event.getStatus())
|
||||
.toString();
|
||||
|
||||
HttpClient client = HttpClient.newHttpClient();
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(server+"/task/"+event.getTaskID()))
|
||||
.PUT(HttpRequest.BodyPublishers.ofString(requestBody))
|
||||
.header("Content-Type", "application/json")
|
||||
.PUT(HttpRequest.BodyPublishers.ofString(body))
|
||||
.build();
|
||||
|
||||
/** Needs the other service running
|
||||
try {
|
||||
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
// Restore interrupted state...
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
**/
|
||||
|
||||
System.out.println("Finish execution event sent with result:" + event.getResult());
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@@ -1,8 +1,10 @@
|
||||
package ch.unisg.executorBase.executor.adapter.out.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -11,6 +13,8 @@ import ch.unisg.executorBase.executor.application.port.out.GetAssignmentPort;
|
||||
import ch.unisg.executorBase.executor.domain.ExecutorType;
|
||||
import ch.unisg.executorBase.executor.domain.Task;
|
||||
|
||||
import org.json.JSONObject;
|
||||
|
||||
@Component
|
||||
@Primary
|
||||
public class GetAssignmentAdapter implements GetAssignmentPort {
|
||||
@@ -19,27 +23,36 @@ public class GetAssignmentAdapter implements GetAssignmentPort {
|
||||
String server = "http://127.0.0.1:8082";
|
||||
|
||||
@Override
|
||||
public Task getAssignment(ExecutorType executorType) {
|
||||
|
||||
public Task getAssignment(ExecutorType executorType, String ip, int port) {
|
||||
|
||||
String body = new JSONObject()
|
||||
.put("executorType", executorType)
|
||||
.put("ip", ip)
|
||||
.put("port", port)
|
||||
.toString();
|
||||
|
||||
HttpClient client = HttpClient.newHttpClient();
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(server+"/assignment/" + executorType))
|
||||
.GET()
|
||||
.uri(URI.create(server+"/task/apply"))
|
||||
.header("Content-Type", "application/json")
|
||||
.POST(HttpRequest.BodyPublishers.ofString(body))
|
||||
.build();
|
||||
|
||||
/** Needs the other service running
|
||||
try {
|
||||
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
if (response.body().equals("")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Task(new JSONObject(response.body()).getString("taskID"));
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
// Restore interrupted state...
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
**/
|
||||
|
||||
// TODO return null or a new Task here depending on the response of the http call
|
||||
|
||||
return new Task("123");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@@ -9,7 +9,7 @@ import lombok.Value;
|
||||
|
||||
@Value
|
||||
public class TaskAvailableCommand extends SelfValidating<TaskAvailableCommand> {
|
||||
|
||||
|
||||
@NotNull
|
||||
private final ExecutorType taskType;
|
||||
|
||||
|
@@ -4,5 +4,5 @@ import ch.unisg.executorBase.executor.domain.ExecutorType;
|
||||
import ch.unisg.executorBase.executor.domain.Task;
|
||||
|
||||
public interface GetAssignmentPort {
|
||||
Task getAssignment(ExecutorType executorType);
|
||||
Task getAssignment(ExecutorType executorType, String ip, int port);
|
||||
}
|
||||
|
@@ -4,13 +4,6 @@ import ch.unisg.executorBase.executor.application.port.out.ExecutionFinishedEven
|
||||
import ch.unisg.executorBase.executor.application.port.out.GetAssignmentPort;
|
||||
import ch.unisg.executorBase.executor.application.port.out.NotifyExecutorPoolPort;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.transaction.Transactional;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Configurable;
|
||||
|
||||
import ch.unisg.executorBase.executor.adapter.out.web.ExecutionFinishedEventAdapter;
|
||||
import ch.unisg.executorBase.executor.adapter.out.web.GetAssignmentAdapter;
|
||||
import ch.unisg.executorBase.executor.adapter.out.web.NotifyExecutorPoolAdapter;
|
||||
@@ -44,23 +37,19 @@ public abstract class ExecutorBase {
|
||||
this.ip = "localhost";
|
||||
this.port = 8084;
|
||||
this.executorType = executorType;
|
||||
|
||||
|
||||
this.status = ExecutorStatus.STARTING_UP;
|
||||
if(!notifyExecutorPoolService.notifyExecutorPool(this.ip, this.port, this.executorType)) {
|
||||
System.exit(0);
|
||||
} else {
|
||||
System.out.println(true);
|
||||
this.status = ExecutorStatus.IDLING;
|
||||
getAssignment();
|
||||
}
|
||||
}
|
||||
|
||||
// public static ExecutorBase getExecutor() {
|
||||
// return executor;
|
||||
// }
|
||||
|
||||
public void getAssignment() {
|
||||
Task newTask = getAssignmentPort.getAssignment(this.getExecutorType());
|
||||
Task newTask = getAssignmentPort.getAssignment(this.getExecutorType(), this.getIp(),
|
||||
this.getPort());
|
||||
if (newTask != null) {
|
||||
this.executeTask(newTask);
|
||||
} else {
|
||||
@@ -71,15 +60,16 @@ public abstract class ExecutorBase {
|
||||
private void executeTask(Task task) {
|
||||
System.out.println("Starting execution");
|
||||
this.status = ExecutorStatus.EXECUTING;
|
||||
|
||||
|
||||
task.setResult(execution());
|
||||
|
||||
executionFinishedEventPort.publishExecutionFinishedEvent(new ExecutionFinishedEvent(task.getTaskID(), task.getResult(), "SUCCESS"));
|
||||
executionFinishedEventPort.publishExecutionFinishedEvent(
|
||||
new ExecutionFinishedEvent(task.getTaskID(), task.getResult(), "SUCCESS"));
|
||||
|
||||
System.out.println("Finish execution");
|
||||
getAssignment();
|
||||
}
|
||||
|
||||
protected abstract String execution();
|
||||
|
||||
|
||||
}
|
||||
|
@@ -1,11 +1,10 @@
|
||||
package ch.unisg.executorBase.executor.domain;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
public class Task {
|
||||
|
||||
|
||||
@Getter
|
||||
private String taskID;
|
||||
|
||||
|
Reference in New Issue
Block a user