executorbase restructure + improvments

This commit is contained in:
2021-12-22 19:35:09 +01:00
parent 1153b75322
commit 88623f6327
66 changed files with 573 additions and 950 deletions

View File

@@ -1,21 +0,0 @@
package ch.unisg.executorbase.executor.application.port.in;
import javax.validation.constraints.NotNull;
import ch.unisg.common.validation.SelfValidating;
import ch.unisg.executorbase.executor.domain.ExecutorType;
import lombok.EqualsAndHashCode;
import lombok.Value;
@Value
@EqualsAndHashCode(callSuper=false)
public class TaskAvailableCommand extends SelfValidating<TaskAvailableCommand> {
@NotNull
private final ExecutorType taskType;
public TaskAvailableCommand(ExecutorType taskType) {
this.taskType = taskType;
this.validateSelf();
}
}

View File

@@ -1,5 +0,0 @@
package ch.unisg.executorbase.executor.application.port.in;
public interface TaskAvailableUseCase {
void newTaskAvailable(TaskAvailableCommand command);
}

View File

@@ -1,7 +0,0 @@
package ch.unisg.executorbase.executor.application.port.out;
import ch.unisg.executorbase.executor.domain.ExecutionFinishedEvent;
public interface ExecutionFinishedEventPort {
void publishExecutionFinishedEvent(ExecutionFinishedEvent event);
}

View File

@@ -1,9 +0,0 @@
package ch.unisg.executorbase.executor.application.port.out;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.domain.ExecutorType;
import ch.unisg.executorbase.executor.domain.Task;
public interface GetAssignmentPort {
Task getAssignment(ExecutorType executorType, ExecutorURI executorURI);
}

View File

@@ -1,8 +0,0 @@
package ch.unisg.executorbase.executor.application.port.out;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.domain.ExecutorType;
public interface NotifyExecutorPoolPort {
boolean notifyExecutorPool(ExecutorURI executorURI, ExecutorType executorType);
}

View File

@@ -1,16 +0,0 @@
package ch.unisg.executorbase.executor.application.service;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.application.port.out.NotifyExecutorPoolPort;
import ch.unisg.executorbase.executor.domain.ExecutorType;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class NotifyExecutorPoolService {
private final NotifyExecutorPoolPort notifyExecutorPoolPort;
public boolean notifyExecutorPool(ExecutorURI executorURI, ExecutorType executorType) {
return notifyExecutorPoolPort.notifyExecutorPool(executorURI, executorType);
}
}

View File

@@ -1,20 +0,0 @@
package ch.unisg.executorbase.executor.application.service;
import org.springframework.stereotype.Component;
import ch.unisg.executorbase.executor.application.port.in.TaskAvailableCommand;
import ch.unisg.executorbase.executor.application.port.in.TaskAvailableUseCase;
import lombok.RequiredArgsConstructor;
import javax.transaction.Transactional;
@RequiredArgsConstructor
@Component
@Transactional
public class TaskAvailableService implements TaskAvailableUseCase {
@Override
public void newTaskAvailable(TaskAvailableCommand command) {
// Placeholder so spring can create a bean, implementation of this service is inside the individual executors
}
}

View File

@@ -1,21 +0,0 @@
package ch.unisg.executorbase.executor.domain;
import lombok.Getter;
public class ExecutionFinishedEvent {
@Getter
private String taskID;
@Getter
private String outputData;
@Getter
private String status;
public ExecutionFinishedEvent(String taskID, String outputData, String status) {
this.taskID = taskID;
this.outputData = outputData;
this.status = status;
}
}

View File

@@ -1,93 +0,0 @@
package ch.unisg.executorbase.executor.domain;
import java.util.logging.Level;
import java.util.logging.Logger;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.adapter.out.web.ExecutionFinishedEventAdapter;
import ch.unisg.executorbase.executor.adapter.out.web.GetAssignmentAdapter;
import ch.unisg.executorbase.executor.adapter.out.web.NotifyExecutorPoolAdapter;
import ch.unisg.executorbase.executor.application.port.out.ExecutionFinishedEventPort;
import ch.unisg.executorbase.executor.application.port.out.GetAssignmentPort;
import ch.unisg.executorbase.executor.application.port.out.NotifyExecutorPoolPort;
import ch.unisg.executorbase.executor.application.service.NotifyExecutorPoolService;
import lombok.Getter;
public abstract class ExecutorBase {
@Getter
private ExecutorURI executorURI;
@Getter
private ExecutorType executorType;
@Getter
private ExecutorStatus status;
// TODO Violation of the Dependency Inversion Principle?,
// TODO do this with only services
private final NotifyExecutorPoolPort notifyExecutorPoolPort = new NotifyExecutorPoolAdapter();
private final NotifyExecutorPoolService notifyExecutorPoolService = new NotifyExecutorPoolService(notifyExecutorPoolPort);
private final GetAssignmentPort getAssignmentPort = new GetAssignmentAdapter();
private final ExecutionFinishedEventPort executionFinishedEventPort = new ExecutionFinishedEventAdapter();
Logger logger = Logger.getLogger(ExecutorBase.class.getName());
protected ExecutorBase(ExecutorType executorType, String uri) {
logger.info("ExecutorBase | Starting Executor");
this.status = ExecutorStatus.STARTING_UP;
this.executorType = executorType;
this.executorURI = new ExecutorURI(uri);
// TODO do this in main
// Notify executor-pool about existence. If executor-pools response is successfull start with getting an assignment, else shut down executor.
logger.info("ExecutorBase | Notifying executor-pool about existens");
if(!notifyExecutorPoolService.notifyExecutorPool(this.executorURI, this.executorType)) {
logger.log(Level.WARNING, "ExecutorBase | Executor could not connect to executor pool! Shuting down!");
System.exit(0);
} else {
logger.info("ExecutorBase | Executor conntected to executor pool");
this.status = ExecutorStatus.IDLING;
getAssignment();
}
}
/**
* Requests a new task from the task queue
* @return void
**/
public void getAssignment() {
Task newTask = getAssignmentPort.getAssignment(this.getExecutorType(), this.getExecutorURI());
if (newTask != null) {
logger.info("ExecutorBase | Executor got a new task");
this.executeTask(newTask);
} else {
logger.info("ExecutorBase | Executor got no new task");
this.status = ExecutorStatus.IDLING;
}
}
/**
* Start the execution of a task
* @return void
**/
private void executeTask(Task task) {
logger.info("ExecutorBase | Starting execution");
this.status = ExecutorStatus.EXECUTING;
task.setOutputData(execution(task.getInputData()));
// TODO implement logic if execution was not successful
executionFinishedEventPort.publishExecutionFinishedEvent(
new ExecutionFinishedEvent(task.getTaskID(), task.getOutputData(), "SUCCESS"));
logger.info("ExecutorBase | Finish execution");
getAssignment();
}
/**
* Implementation of the actual execution method of an executor
* @return the execution result
**/
protected abstract String execution(String input);
}

View File

@@ -1,22 +0,0 @@
package ch.unisg.executorbase.executor.domain;
public enum ExecutorType {
COMPUTATION, SMALLROBOT, HUMIDITY;
/**
* Checks if the give executor type exists.
* @return Whether the given executor type exists
**/
public static boolean contains(String test) {
for (ExecutorType x : ExecutorType.values()) {
if (x.name().equals(test)) {
return true;
}
}
return false;
}
}

View File

@@ -0,0 +1,66 @@
package ch.unisg.executorbase;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.services.GetAssignmentService;
import ch.unisg.executorbase.services.NotifyExecutorPoolService;
import lombok.Getter;
@Component
public class Executor {
@Getter
ExecutorStatus executorStatus = ExecutorStatus.STARTING_UP;
@Getter
@Value("${executor.type}")
String executorType;
@Getter
@Value("${executor.uri}")
ExecutorURI executorUri;
@Autowired
NotifyExecutorPoolService notifyExecutorPoolService;
@Autowired
GetAssignmentService getAssignmentService;
private Logger logger = Logger.getLogger(Executor.class.getName());
public Executor() {
executorStatus = ExecutorStatus.IDLING;
}
public void init() {
if(!notifyExecutorPoolService.executorStarted(this.executorUri, this.executorType)) {
logger.log(Level.WARNING, "ExecutorBase | Executor could not connect to executor pool! Shuting down!");
System.exit(0);
} else {
logger.info("ExecutorBase | Executor conntected to executor pool");
this.setIdling();
getAssignmentService.getAssignment();
}
}
// @PreDestroy
// public void preDestroy() {
// System.out.println("TEST");
// notifyExecutorPoolService.executorStopped(this.executorUri);
// }
public void setIdling() {
this.executorStatus = ExecutorStatus.IDLING;
}
public void setExecuting() {
this.executorStatus = ExecutorStatus.EXECUTING;
}
}

View File

@@ -0,0 +1,27 @@
package ch.unisg.executorbase;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import ch.unisg.executorbase.services.NotifyExecutorPoolService;
public class ExecutorBase {
@Autowired
Executor executor;
@Autowired
NotifyExecutorPoolService notifyExecutorPoolService;
@PostConstruct
private void initialiseRoster(){
executor.init();
}
@PreDestroy
public void test() {
System.out.println("TEST");
}
}

View File

@@ -1,4 +1,4 @@
package ch.unisg.executorbase.executor.domain;
package ch.unisg.executorbase;
public enum ExecutorStatus {
STARTING_UP, // Executor is starting

View File

@@ -1,4 +1,4 @@
package ch.unisg.executorbase.executor.domain;
package ch.unisg.executorbase;
import lombok.Getter;
import lombok.Setter;

View File

@@ -1,7 +1,9 @@
package ch.unisg.executorbase.executor.adapter.in.web;
package ch.unisg.executorbase.controller;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -9,17 +11,13 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import ch.unisg.executorbase.executor.application.port.in.TaskAvailableCommand;
import ch.unisg.executorbase.executor.application.port.in.TaskAvailableUseCase;
import ch.unisg.executorbase.executor.domain.ExecutorType;
import ch.unisg.executorbase.services.TaskAvailableService;
@RestController
public class TaskAvailableController {
private final TaskAvailableUseCase taskAvailableUseCase;
public TaskAvailableController(TaskAvailableUseCase taskAvailableUseCase) {
this.taskAvailableUseCase = taskAvailableUseCase;
}
@Autowired
private TaskAvailableService taskAvailableService;
Logger logger = Logger.getLogger(TaskAvailableController.class.getName());
@@ -27,16 +25,12 @@ public class TaskAvailableController {
* Controller for notification about new events.
* @return 200 OK
**/
@GetMapping(path = "/newtask/{taskType}", consumes = { "application/json" })
@GetMapping(path = "/newtask/{taskType}")
public ResponseEntity<String> retrieveTaskFromTaskList(@PathVariable("taskType") String taskType) {
logger.info("ExecutorBase | New " + taskType + " task available");
if (ExecutorType.contains(taskType.toUpperCase())) {
TaskAvailableCommand command = new TaskAvailableCommand(
ExecutorType.valueOf(taskType.toUpperCase()));
taskAvailableUseCase.newTaskAvailable(command);
}
CompletableFuture.runAsync(() -> taskAvailableService.newTaskAvailable(taskType.toUpperCase()));
// Add the content type as a response header
HttpHeaders responseHeaders = new HttpHeaders();

View File

@@ -0,0 +1,42 @@
package ch.unisg.executorbase.services;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import ch.unisg.executorbase.Executor;
import ch.unisg.executorbase.Task;
public abstract class ExecuteTaskServiceBase implements ExecuteTaskServiceInterface {
private Logger logger = Logger.getLogger(ExecuteTaskServiceBase.class.getName());
@Autowired
private Executor executor;
@Autowired
private GetAssignmentService getAssignmentService;
@Autowired
private ExecutionFinishedService executionFinishedService;
/**
* Start the execution of a task
* @return void
**/
public void executeTask(Task task) {
logger.info("ExecutorBase | Starting execution");
executor.setExecuting();
task.setOutputData(execution(task.getInputData()));
// TODO implement logic if execution was not successful
executionFinishedService.publishExecutionFinishedEvent(task.getTaskID(), task.getOutputData(), "SUCCESS");
logger.info("ExecutorBase | Finish execution");
getAssignmentService.getAssignment();
}
}

View File

@@ -0,0 +1,13 @@
package ch.unisg.executorbase.services;
import ch.unisg.executorbase.Task;
public interface ExecuteTaskServiceInterface {
void executeTask(Task task);
/**
* Implementation of the actual execution method of an executor
* @return the execution result
**/
String execution(String input);
}

View File

@@ -1,4 +1,4 @@
package ch.unisg.executorbase.executor.adapter.out.web;
package ch.unisg.executorbase.services;
import java.io.IOException;
import java.net.URI;
@@ -9,36 +9,34 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import ch.unisg.executorbase.executor.application.port.out.ExecutionFinishedEventPort;
import ch.unisg.executorbase.executor.domain.ExecutionFinishedEvent;
@Component
public class ExecutionFinishedService {
public class ExecutionFinishedEventAdapter implements ExecutionFinishedEventPort {
@Value("${roster.uri}")
private String rosterUri;
String server = System.getenv("ROSTER_URI") == null ?
"http://localhost:8082" : System.getenv("ROSTER_URI");
Logger logger = Logger.getLogger(ExecutionFinishedEventAdapter.class.getName());
private Logger logger = Logger.getLogger(ExecutionFinishedService.class.getName());
/**
* Publishes the execution finished event
* @return void
**/
@Override
public void publishExecutionFinishedEvent(ExecutionFinishedEvent event) {
public void publishExecutionFinishedEvent(String taskID, String outputData, String status) {
logger.log(Level.INFO, "ExecutorBase | Sending finish execution event....");
String body = new JSONObject()
.put("taskID", event.getTaskID())
.put("outputData", event.getOutputData())
.put("status", event.getStatus())
.put("taskID", taskID)
.put("outputData", outputData)
.put("status", status)
.toString();
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server+"/task/completed"))
.uri(URI.create(rosterUri+"/task/completed"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
@@ -53,7 +51,7 @@ public class ExecutionFinishedEventAdapter implements ExecutionFinishedEventPort
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
}
logger.log(Level.INFO, "ExecutorBase | Finish execution event sent with result: {0}", event.getOutputData());
logger.log(Level.INFO, "ExecutorBase | Finish execution event sent with result: {0}", outputData);
}
}

View File

@@ -1,4 +1,4 @@
package ch.unisg.executorbase.executor.adapter.out.web;
package ch.unisg.executorbase.services;
import java.io.IOException;
import java.net.URI;
@@ -8,42 +8,57 @@ import java.net.http.HttpResponse;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.application.port.out.GetAssignmentPort;
import ch.unisg.executorbase.executor.domain.ExecutorType;
import ch.unisg.executorbase.executor.domain.Task;
import org.json.JSONObject;
import ch.unisg.executorbase.Executor;
import ch.unisg.executorbase.Task;
@Component
@Primary
public class GetAssignmentAdapter implements GetAssignmentPort {
public class GetAssignmentService {
String server = System.getenv("ROSTER_URI") == null ?
"http://localhost:8082" : System.getenv("ROSTER_URI");
@Value("${roster.uri}")
String rosterUri;
Logger logger = Logger.getLogger(GetAssignmentAdapter.class.getName());
Logger logger = Logger.getLogger(GetAssignmentService.class.getName());
@Autowired
private Executor executor;
@Autowired
private ExecuteTaskServiceInterface executeTaskService;
public void getAssignment() {
Task newTask = requestAssignment();
if (newTask != null) {
logger.info("ExecutorBase | Executor got a new task");
executeTaskService.executeTask(newTask);
} else {
logger.info("ExecutorBase | Executor got no new task");
executor.setIdling();
}
}
/**
* Requests a new task assignment
* @return the assigned task
* @see Task
**/
@Override
public Task getAssignment(ExecutorType executorType, ExecutorURI executorURI) {
private Task requestAssignment() {
String body = new JSONObject()
.put("executorType", executorType)
.put("executorURI", executorURI.getValue())
.toString();
.put("executorType", executor.getExecutorType())
.put("executorURI", executor.getExecutorUri().getValue())
.toString();
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server+"/task/apply"))
.uri(URI.create(rosterUri + "/task/apply"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
@@ -72,5 +87,4 @@ public class GetAssignmentAdapter implements GetAssignmentPort {
return null;
}
}

View File

@@ -1,4 +1,4 @@
package ch.unisg.executorbase.executor.adapter.out.web;
package ch.unisg.executorbase.services;
import java.io.IOException;
import java.net.URI;
@@ -9,29 +9,27 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import ch.unisg.common.valueobject.ExecutorURI;
import ch.unisg.executorbase.executor.application.port.out.NotifyExecutorPoolPort;
import ch.unisg.executorbase.executor.domain.ExecutorType;
@Component
@Primary
public class NotifyExecutorPoolAdapter implements NotifyExecutorPoolPort {
public class NotifyExecutorPoolService {
String server = System.getenv("EXECUTOR_POOL_URI") == null ?
"http://localhost:8083" : System.getenv("EXECUTOR_POOL_URI");
@Value("${executor.pool.uri}")
String executorPoolUri;
Logger logger = Logger.getLogger(NotifyExecutorPoolAdapter.class.getName());
Logger logger = Logger.getLogger(NotifyExecutorPoolService.class.getName());
/**
* Notifies the executor-pool about the startup of this executor
* @return if the notification was successful
**/
@Override
public boolean notifyExecutorPool(ExecutorURI executorURI, ExecutorType executorType) {
public boolean executorStarted(ExecutorURI executorURI, String executorType) {
String body = new JSONObject()
.put("executorTaskType", executorType)
@@ -40,7 +38,7 @@ public class NotifyExecutorPoolAdapter implements NotifyExecutorPoolPort {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server+"/executor-pool/executors"))
.uri(URI.create(executorPoolUri + "/executor-pool/executors"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
@@ -58,6 +56,28 @@ public class NotifyExecutorPoolAdapter implements NotifyExecutorPoolPort {
}
return false;
}
}
/**
* Notifies the executor-pool about the shutdown of this executor
**/
public static void executorStopped(String executorURI) {
System.out.println("TEST");
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8083" + "/executor-pool/executors/" + executorURI))
// .uri(URI.create(executorPoolUri + "/executor-pool/executors/" + executorURI))
.header("Content-Type", "application/json")
.DELETE()
.build();
try {
client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (InterruptedException e) {
// logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
// Thread.currentThread().interrupt();
} catch (IOException e) {
// logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
}
}
}

View File

@@ -0,0 +1,28 @@
package ch.unisg.executorbase.services;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import ch.unisg.executorbase.Executor;
import ch.unisg.executorbase.ExecutorStatus;
import lombok.RequiredArgsConstructor;
import javax.transaction.Transactional;
@RequiredArgsConstructor
@Component
@Transactional
public class TaskAvailableService {
@Autowired
Executor executor;
@Autowired
GetAssignmentService getAssignmentService;
public void newTaskAvailable(String taskType) {
if (executor.getExecutorStatus() == ExecutorStatus.IDLING && executor.getExecutorType().equalsIgnoreCase(taskType)) {
getAssignmentService.getAssignment();
}
}
}

View File

@@ -1,6 +1,4 @@
server.port=8081
roster.url=http://127.0.0.1:8082
executor.pool.url=http://127.0.0.1:8083
roster.uri=http://localhost:8082
spring.profiles.active=chaos-monkey
chaos.monkey.enabled=false