Persistence mongodb #84

Merged
rahimiankeanu merged 8 commits from Persistence-mongodb into dev 2021-11-21 20:18:15 +00:00
22 changed files with 130 additions and 175 deletions
Showing only changes of commit ef044f1cf1 - Show all commits

View File

@ -49,6 +49,8 @@ services:
restart: unless-stopped
volumes:
- ./:/data/
environment:
mqtt.broker.uri: tcp://broker.hivemq.com:1883
labels:
- "traefik.enable=true"
- "traefik.http.routers.tapas-auction-house.rule=Host(`tapas-auction-house.${PUB_IP}.nip.io`)"
@ -64,6 +66,11 @@ services:
restart: unless-stopped
volumes:
- ./:/data/
environment:
task-list.uri: http://tapas-tasks:8081
executor-robot.uri: http://executor-robot:8084
executor-computation.uri: http://executor-computation:8085
mqtt.broker.uri: tcp://broker.hivemq.com:1883
labels:
- "traefik.enable=true"
- "traefik.http.routers.roster.rule=Host(`roster.${PUB_IP}.nip.io`)"
@ -79,6 +86,8 @@ services:
restart: unless-stopped
volumes:
- ./:/data/
environment:
mqtt.broker.uri: tcp://broker.hivemq.com:1883
labels:
- "traefik.enable=true"
- "traefik.http.routers.executor-pool.rule=Host(`executor-pool.${PUB_IP}.nip.io`)"

View File

@ -57,11 +57,11 @@ public abstract class ExecutorBase {
**/
public void getAssignment() {
Task newTask = getAssignmentPort.getAssignment(this.getExecutorType(), this.getExecutorURI());
System.out.println("New assignment");
System.out.println(newTask);
if (newTask != null) {
logger.info("Executor got a new task");
this.executeTask(newTask);
} else {
logger.info("Executor got no new task");
this.status = ExecutorStatus.IDLING;
}
}

View File

@ -1,6 +1,3 @@
server.port=8081
roster.url=http://127.0.0.1:8082
executor.pool.url=http://127.0.0.1:8083
executor1.url=http://127.0.0.1:8084
executor2.url=http://127.0.0.1:8085
task-list.url=http://127.0.0.1:8081

View File

@ -21,8 +21,6 @@ public class Executor extends ExecutorBase {
protected
String execution(String inputData) {
System.out.println(inputData);
String operator = "";
if (inputData.contains("+")) {
operator = "+";
@ -30,34 +28,34 @@ public class Executor extends ExecutorBase {
operator = "-";
} else if (inputData.contains("*")) {
operator = "*";
} else {
return "invalid data";
}
// System.out.println(operator);
double result = Double.NaN;
// double result = Double.NaN;
// System.out.print(inputData.split("+"));
// int a = Integer.parseInt(inputData.split(operator)[0]);
// int b = Integer.parseInt(inputData.split(operator)[1]);
// // try {
// // TimeUnit.SECONDS.sleep(20);
// // } catch (InterruptedException e) {
// // e.printStackTrace();
// // }
// if (operator.equalsIgnoreCase("+")) {
// result = a + b;
// } else if (operator.equalsIgnoreCase("*")) {
// result = a * b;
// } else if (operator.equalsIgnoreCase("-")) {
// result = a - b;
// try {
// TimeUnit.SECONDS.sleep(5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("Result: " + result);
double result = 0.0;
if (operator.equalsIgnoreCase("+")) {
String[] parts = inputData.split("\\+");
double a = Double.parseDouble(parts[0]);
double b = Double.parseDouble(parts[1]);
result = a + b;
} else if (operator.equalsIgnoreCase("*")) {
String[] parts = inputData.split("\\*");
double a = Double.parseDouble(parts[0]);
double b = Double.parseDouble(parts[1]);
result = a * b;
} else if (operator.equalsIgnoreCase("-")) {
String[] parts = inputData.split("-");
double a = Double.parseDouble(parts[0]);
double b = Double.parseDouble(parts[1]);
result = a - b;
}
return Double.toString(result);
}

View File

@ -1,3 +1,3 @@
server.port=8083
mqtt.broker.uri=tcp://broker.hivemq.com:1883
mqtt.broker.uri=tcp://localhost:1883

View File

@ -6,6 +6,7 @@ import java.util.logging.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.ConfigurableEnvironment;
import ch.unisg.roster.roster.adapter.common.clients.TapasMqttClient;
import ch.unisg.roster.roster.adapter.in.messaging.mqtt.ExecutorEventMqttListener;
@ -16,11 +17,12 @@ public class RosterApplication {
static Logger logger = Logger.getLogger(RosterApplication.class.getName());
public static String MQTT_BROKER = "tcp://broker.hivemq.com:1883";
private static ConfigurableEnvironment ENVIRONMENT;
public static void main(String[] args) {
SpringApplication.run(RosterApplication.class, args);
SpringApplication rosterApp = new SpringApplication(RosterApplication.class);
ENVIRONMENT = rosterApp.run(args).getEnvironment();
bootstrapMarketplaceWithMqtt();
}
@ -29,9 +31,11 @@ public class RosterApplication {
* one another
*/
private static void bootstrapMarketplaceWithMqtt() {
String broker = ENVIRONMENT.getProperty("mqtt.broker.uri");
try {
ExecutorEventsMqttDispatcher dispatcher = new ExecutorEventsMqttDispatcher();
TapasMqttClient client = TapasMqttClient.getInstance(MQTT_BROKER, dispatcher);
TapasMqttClient client = TapasMqttClient.getInstance(broker, dispatcher);
client.startReceivingMessages();
} catch (MqttException e) {
logger.log(Level.SEVERE, e.getMessage(), e);

View File

@ -25,7 +25,6 @@ public class TaskCompletedController {
**/
@PostMapping(path = "/task/completed", consumes = {"application/json"})
public ResponseEntity<Void> addNewTaskTaskToTaskList(@RequestBody Task task) {
System.out.println("TEST");
TaskCompletedCommand command = new TaskCompletedCommand(task.getTaskID(),
task.getStatus(), task.getOutputData());

View File

@ -1,63 +0,0 @@
package ch.unisg.roster.roster.adapter.out.web;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import ch.unisg.roster.roster.application.port.out.GetAllExecutorInExecutorPoolByTypePort;
import ch.unisg.roster.roster.domain.valueobject.ExecutorType;
@Component
@Primary
public class GetAllExecutorInExecutorPoolByTypeAdapter implements GetAllExecutorInExecutorPoolByTypePort {
@Value("${executor-pool.url}")
private String server;
/**
* Requests all executor of the give type from the executor-pool and cheks if there is one
* avaialable of this type.
* @return Whether an executor exist or not
**/
@Override
public boolean doesExecutorTypeExist(ExecutorType type) {
Logger logger = Logger.getLogger(PublishNewTaskEventAdapter.class.getName());
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server + "/executor-pool/GetAllExecutorInExecutorPoolByType/" + type.getValue()))
.header("Content-Type", "application/json")
.GET()
.build();
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == HttpStatus.OK.value()) {
JSONArray jsonArray = new JSONArray(response.body());
if (jsonArray.length() > 0) {
return true;
}
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
Thread.currentThread().interrupt();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
}
return false;
}
}

View File

@ -19,10 +19,10 @@ import ch.unisg.roster.roster.domain.event.NewTaskEvent;
@Primary
public class PublishNewTaskEventAdapter implements NewTaskEventPort {
@Value("${executor1.url}")
@Value("${executor-robot.uri}")
private String server;
@Value("${executor2.url}")
@Value("${executor-computation.uri}")
private String server2;
Logger logger = Logger.getLogger(PublishNewTaskEventAdapter.class.getName());

View File

@ -20,7 +20,7 @@ import ch.unisg.roster.roster.domain.event.TaskAssignedEvent;
@Primary
public class PublishTaskAssignedEventAdapter implements TaskAssignedEventPort {
@Value("${task-list.url}")
@Value("${task-list.uri}")
private String server;
Logger logger = Logger.getLogger(PublishTaskAssignedEventAdapter.class.getName());
@ -32,26 +32,26 @@ public class PublishTaskAssignedEventAdapter implements TaskAssignedEventPort {
@Override
public void publishTaskAssignedEvent(TaskAssignedEvent event) {
// String body = new JSONObject()
// .put("taskId", event.taskID)
// .toString();
String body = new JSONObject()
.put("taskId", event.taskID)
.toString();
// HttpClient client = HttpClient.newHttpClient();
// HttpRequest request = HttpRequest.newBuilder()
// .uri(URI.create(server + "/tasks/assignTask"))
// .header("Content-Type", "application/task+json")
// .POST(HttpRequest.BodyPublishers.ofString(body))
// .build();
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server + "/tasks/assignTask"))
.header("Content-Type", "application/task+json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
// try {
// client.send(request, HttpResponse.BodyHandlers.ofString());
// } catch (InterruptedException e) {
// logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
// Thread.currentThread().interrupt();
// } catch (IOException e) {
// logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
// }
try {
client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (InterruptedException e) {
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
Thread.currentThread().interrupt();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
}
}
}

View File

@ -20,7 +20,7 @@ import ch.unisg.roster.roster.domain.event.TaskCompletedEvent;
@Primary
public class PublishTaskCompletedEventAdapter implements TaskCompletedEventPort {
@Value("${task-list.url}")
@Value("${task-list.uri}")
private String server;
Logger logger = Logger.getLogger(PublishTaskCompletedEventAdapter.class.getName());
@ -32,22 +32,17 @@ public class PublishTaskCompletedEventAdapter implements TaskCompletedEventPort
@Override
public void publishTaskCompleted(TaskCompletedEvent event) {
System.out.println("PublishTaskCompletedEventAdapter.publishTaskCompleted()");
System.out.print(server);
String body = new JSONObject()
.put("taskId", event.taskID)
.put("status", event.status)
.put("outputData", event.result)
.toString();
System.out.println(event.taskID);
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(server + "/tasks/completeTask/" + event.taskID))
.uri(URI.create(server + "/tasks/completeTask/"))
.header("Content-Type", "application/task+json")
.GET()
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

View File

@ -1,13 +0,0 @@
package ch.unisg.roster.roster.application.port.out;
import ch.unisg.roster.roster.domain.valueobject.ExecutorType;
public interface GetAllExecutorInExecutorPoolByTypePort {
/**
* Checks if a executor with the given type exist in our executor pool
* @return boolean
**/
boolean doesExecutorTypeExist(ExecutorType type);
}

View File

@ -1,5 +1,5 @@
server.port=8082
executor-pool.url=http://127.0.0.1:8083
executor1.url=http://127.0.0.1:8084
executor2.url=http://127.0.0.1:8085
task-list.url=http://127.0.0.1:8081
executor-robot.uri=http://127.0.0.1:8084
executor-computation.uri=http://127.0.0.1:8085
task-list.uri=http://127.0.0.1:8081
mqtt.broker.uri=tcp://localhost:1883

View File

@ -10,4 +10,4 @@ tasks.list.uri=http://localhost:8081
application.environment=development
auctionhouse.uri=http://localhost:8086
websub.hub.uri=http://localhost:3000
mqtt.broker.uri=tcp://broker.hivemq.com:1883
mqtt.broker.uri=tcp://localhost:1883

View File

@ -78,6 +78,8 @@ public class AddNewTaskToTaskListWebController {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (ConstraintViolationException e) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
} catch (NullPointerException e) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
}
}
}

View File

@ -26,17 +26,16 @@ public class CompleteTaskWebController {
this.completeTaskUseCase = completeTaskUseCase;
}
@GetMapping(path = "/tasks/completeTask/{taskId}")
public ResponseEntity<String> completeTask (@PathVariable("taskId") String taskId){
@PostMapping(path = "/tasks/completeTask")
public ResponseEntity<String> completeTask (@RequestBody TaskJsonRepresentation payload) {
System.out.println("completeTask");
System.out.println(taskId);
System.out.println(payload.getTaskId());
String taskResult = "0";
try {
CompleteTaskCommand command = new CompleteTaskCommand(
new Task.TaskId(taskId), new Task.OutputData(taskResult)
new Task.TaskId(payload.getTaskId()), new Task.OutputData(payload.getOutputData())
);
Task updateATask = completeTaskUseCase.completeTask(command);

View File

@ -24,10 +24,10 @@ public class TaskAssignedWebController {
}
@PostMapping(path="/tasks/assignTask", consumes= {TaskJsonRepresentation.MEDIA_TYPE})
public ResponseEntity<String> assignTask(@RequestBody Task task){
public ResponseEntity<String> assignTask(@RequestBody TaskJsonRepresentation payload) {
try{
TaskAssignedCommand command = new TaskAssignedCommand(
task.getTaskId()
new Task.TaskId(payload.getTaskId())
);
Task updateATask = taskAssignedUseCase.assignTask(command);

View File

@ -24,25 +24,19 @@ public class AddNewTaskToTaskListService implements AddNewTaskToTaskListUseCase
Task newTask;
System.out.println("TEST:");
System.out.println(command.getInputData().get());
if (command.getOriginalTaskUri().isPresent() && command.getInputData().isPresent()) {
System.out.println("TEST2:");
newTask = taskList.addNewTaskWithNameAndTypeAndOriginalTaskUriAndInputData(command.getTaskName(),
command.getTaskType(), command.getOriginalTaskUri().get(), command.getInputData().get());
} else if (command.getOriginalTaskUri().isPresent()) {
newTask = taskList.addNewTaskWithNameAndTypeAndOriginalTaskUri(command.getTaskName(),
command.getTaskType(), command.getOriginalTaskUri().get());
} else if (command.getOriginalTaskUri().isPresent()) {
newTask = null;
} else if (command.getInputData().isPresent()) {
newTask = taskList.addNewTaskWithNameAndTypeAndInputData(command.getTaskName(),
command.getTaskType(), command.getInputData().get());
} else {
newTask = taskList.addNewTaskWithNameAndType(command.getTaskName(), command.getTaskType());
}
System.out.println("TEST");
System.out.println(newTask.getInputData());
//Here we are using the application service to emit the domain event to the outside of the bounded context.
//This event should be considered as a light-weight "integration event" to communicate with other services.
//Domain events are usually rather "fat". In our implementation we simplify at this point. In general, it is

View File

@ -26,10 +26,10 @@ public class CompleteTaskService implements CompleteTaskUseCase {
Optional<Task> updatedTask = taskList.retrieveTaskById(command.getTaskId());
Task newTask = updatedTask.get();
newTask.taskResult = new TaskResult(command.getOutputData().getValue());
newTask.taskStatus = new TaskStatus(Task.Status.EXECUTED);
newTask.setOutputData(command.getOutputData());
newTask.setTaskStatus(new TaskStatus(Task.Status.EXECUTED));
if (!newTask.getOriginalTaskUri().getValue().equalsIgnoreCase("")) {
if (newTask.getOriginalTaskUri() != null) {
ExternalTaskExecutedEvent event = new ExternalTaskExecutedEvent(
newTask.getTaskId(),
newTask.getOriginalTaskUri(),

View File

@ -24,7 +24,7 @@ public class TaskAssignedService implements TaskAssignedUseCase {
// update the status to assigned
Task updatedTask = task.get();
updatedTask.taskStatus = new TaskStatus(Status.ASSIGNED);
updatedTask.setTaskStatus(new TaskStatus(Status.ASSIGNED));
return updatedTask;
}

View File

@ -22,10 +22,7 @@ public class Task {
private final TaskType taskType;
@Getter @Setter
public TaskStatus taskStatus; // had to make public for CompleteTaskService
@Getter
public TaskResult taskResult; // same as above
private TaskStatus taskStatus;
@Getter
private final OriginalTaskUri originalTaskUri;
@ -39,24 +36,44 @@ public class Task {
@Getter @Setter
private OutputData outputData;
public Task(TaskName taskName, TaskType taskType) {
this.taskName = taskName;
this.taskType = taskType;
this.taskStatus = new TaskStatus(Status.OPEN);
this.taskId = new TaskId(UUID.randomUUID().toString());
this.originalTaskUri = null;
this.inputData = null;
this.outputData = null;
}
public Task(TaskName taskName, TaskType taskType, OriginalTaskUri taskUri) {
this.taskName = taskName;
this.taskType = taskType;
this.taskStatus = new TaskStatus(Status.OPEN);
this.taskId = new TaskId(UUID.randomUUID().toString());
this.taskResult = new TaskResult("");
this.originalTaskUri = taskUri;
this.inputData = null;
this.outputData = null;
}
public Task(TaskName taskName, TaskType taskType, InputData inputData) {
this.taskName = taskName;
this.taskType = taskType;
this.taskStatus = new TaskStatus(Status.OPEN);
this.taskId = new TaskId(UUID.randomUUID().toString());
this.originalTaskUri = null;
this.inputData = inputData;
this.outputData = null;
}
public Task(TaskName taskName, TaskType taskType, OriginalTaskUri taskUri, InputData inputData) {
this.taskName = taskName;
this.taskType = taskType;
this.taskStatus = new TaskStatus(Status.OPEN);
this.taskId = new TaskId(UUID.randomUUID().toString());
this.taskResult = new TaskResult("");
this.originalTaskUri = taskUri;
this.inputData = inputData;
@ -64,9 +81,7 @@ public class Task {
}
protected static Task createTaskWithNameAndType(TaskName name, TaskType type) {
//This is a simple debug message to see that the request has reached the right method in the core
System.out.println("New Task: " + name.getValue() + " " + type.getValue());
return new Task(name, type, null);
return new Task(name, type);
}
protected static Task createTaskWithNameAndTypeAndOriginalTaskUri(TaskName name, TaskType type,
@ -74,6 +89,11 @@ public class Task {
return new Task(name, type, originalTaskUri);
}
protected static Task createTaskWithNameAndTypeAndInputData(TaskName name, TaskType type,
InputData inputData) {
return new Task(name, type, inputData);
}
protected static Task createTaskWithNameAndTypeAndOriginalTaskUriAndInputData(TaskName name, TaskType type,
OriginalTaskUri originalTaskUri, InputData inputData) {
return new Task(name, type, originalTaskUri, inputData);

View File

@ -6,11 +6,15 @@ import lombok.Value;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
/**This is our aggregate root**/
public class TaskList {
Logger logger = Logger.getLogger(TaskList.class.getName());
@Getter
private final TaskListName taskListName;
@ -48,6 +52,14 @@ public class TaskList {
return newTask;
}
public Task addNewTaskWithNameAndTypeAndInputData(Task.TaskName name, Task.TaskType type,
Task.InputData inputData) {
Task newTask = Task.createTaskWithNameAndTypeAndInputData(name, type, inputData);
this.addNewTaskToList(newTask);
return newTask;
}
public Task addNewTaskWithNameAndTypeAndOriginalTaskUriAndInputData(Task.TaskName name, Task.TaskType type,
Task.OriginalTaskUri originalTaskUri, Task.InputData inputData) {
Task newTask = Task.createTaskWithNameAndTypeAndOriginalTaskUriAndInputData(name, type, originalTaskUri, inputData);
@ -62,8 +74,10 @@ public class TaskList {
//However, we skip this here as it makes the core even more complex (e.g., we have to implement a light-weight
//domain event publisher and subscribers (see "Implementing Domain-Driven Design by V. Vernon, pp. 296ff).
listOfTasks.value.add(newTask);
//This is a simple debug message to see that the task list is growing with each new request
System.out.println("Number of tasks: " + listOfTasks.value.size());
logger.log(Level.INFO, "New task created! Id: " + newTask.getTaskId().getValue() +
" | Name: " + newTask.getTaskName().getValue() +
" | InputData: " + newTask.getInputData().getValue());
logger.log(Level.INFO, "Number of tasks: {0}", listOfTasks.value.size());
}
public Optional<Task> retrieveTaskById(Task.TaskId id) {