diff --git a/executor-pool/pom.xml b/executor-pool/pom.xml
index 2e75dcc..512235d 100644
--- a/executor-pool/pom.xml
+++ b/executor-pool/pom.xml
@@ -63,6 +63,12 @@
javax.transaction-api
compile
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+ compile
+
diff --git a/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java
new file mode 100644
index 0000000..253922c
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java
@@ -0,0 +1,22 @@
+package ch.unisg.common;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import java.net.URI;
+
+@Component
+public class ConfigProperties {
+ @Autowired
+ private Environment environment;
+
+ /**
+ * Retrieves the URI of the MQTT broker.
+ *
+ * @return the URI of the MQTT broker
+ */
+ public URI getMqttBrokerUri() {
+ return URI.create(environment.getProperty("mqtt.broker.uri"));
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java b/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java
deleted file mode 100644
index ca29e09..0000000
--- a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package ch.unisg.executorpool;
-
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-@RestController
-public class TestController {
- @RequestMapping("/")
- public String index() {
- return "Hello World! Executor Pool";
- }
-}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java
new file mode 100644
index 0000000..0b24b81
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java
@@ -0,0 +1,41 @@
+package ch.unisg.executorpool.adapter.common.clients;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+public class TapasMqttClient {
+ private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class);
+
+ private static TapasMqttClient tapasClient = null;
+
+ private MqttClient mqttClient;
+ private final String mqttClientId;
+ private final String brokerAddress;
+
+ private TapasMqttClient(String brokerAddress) {
+ this.mqttClientId = UUID.randomUUID().toString();
+ this.brokerAddress = brokerAddress;
+ }
+
+ public static synchronized TapasMqttClient getInstance(String brokerAddress) {
+
+ if (tapasClient == null) {
+ tapasClient = new TapasMqttClient(brokerAddress);
+ }
+
+ return tapasClient;
+ }
+
+ public void publishMessage(String topic, String payload) throws MqttException {
+ mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
+ mqttClient.connect();
+ MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
+ mqttClient.publish(topic, message);
+ mqttClient.disconnect();
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/formats/ExecutorJsonRepresentation.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/formats/ExecutorJsonRepresentation.java
new file mode 100644
index 0000000..3c8f6e4
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/formats/ExecutorJsonRepresentation.java
@@ -0,0 +1,51 @@
+package ch.unisg.executorpool.adapter.common.formats;
+
+import ch.unisg.executorpool.domain.ExecutorClass;
+import lombok.Getter;
+import lombok.Setter;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import java.util.List;
+
+public class ExecutorJsonRepresentation {
+ public static final String EXECUTOR_MEDIA_TYPE = "application/json";
+
+ @Getter @Setter
+ private String executorUri;
+
+ @Getter @Setter
+ private String executorTaskType;
+
+ // TODO Check if this need Setters. Also applies to AuctionJsonRepresentation
+ public ExecutorJsonRepresentation(String executorUri, String executorTaskType){
+ this.executorUri = executorUri;
+ this.executorTaskType = executorTaskType;
+ }
+
+ public static String serialize(ExecutorClass executorClass) {
+ JSONObject payload = new JSONObject();
+
+ payload.put("executorUri", executorClass.getExecutorUri().getValue());
+ payload.put("executorTaskType", executorClass.getExecutorTaskType().getValue());
+
+ return payload.toString();
+ }
+
+ public static String serialize(List listOfExecutors) {
+ JSONArray jsonArray = new JSONArray();
+
+ for (ExecutorClass executor: listOfExecutors) {
+ JSONObject jsonObject = new JSONObject();
+
+ jsonObject.put("executorUri", executor.getExecutorUri().getValue());
+ jsonObject.put("executorTaskType", executor.getExecutorTaskType().getValue());
+
+ jsonArray.put(jsonObject);
+ }
+
+ return jsonArray.toString();
+ }
+
+ private ExecutorJsonRepresentation() { }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
index 7967b6b..ff464d3 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
@@ -1,5 +1,7 @@
package ch.unisg.executorpool.adapter.in.web;
+import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
import ch.unisg.executorpool.domain.ExecutorClass;
@@ -11,6 +13,11 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import javax.validation.ConstraintViolationException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.eclipse.paho.client.mqttv3.*;
@RestController
public class AddNewExecutorToExecutorPoolWebController {
@@ -20,19 +27,21 @@ public class AddNewExecutorToExecutorPoolWebController {
this.addNewExecutorToExecutorPoolUseCase = addNewExecutorToExecutorPoolUseCase;
}
- @PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorMediaType.EXECUTOR_MEDIA_TYPE})
- public ResponseEntity addNewExecutorToExecutorPool(@RequestBody ExecutorClass executorClass){
- try{
+ @PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
+ public ResponseEntity addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){
+ try {
AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand(
- executorClass.getExecutorIp(), executorClass.getExecutorPort(), executorClass.getExecutorTaskType()
+ new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())),
+ new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType())
);
ExecutorClass newExecutor = addNewExecutorToExecutorPoolUseCase.addNewExecutorToExecutorPool(command);
HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE);
+ responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
+
+ return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
- return new ResponseEntity<>(ExecutorMediaType.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
} catch (ConstraintViolationException e){
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/ExecutorMediaType.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/ExecutorMediaType.java
deleted file mode 100644
index 0ca4e1f..0000000
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/ExecutorMediaType.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package ch.unisg.executorpool.adapter.in.web;
-
-import ch.unisg.executorpool.domain.ExecutorClass;
-import org.json.JSONArray;
-import org.json.JSONObject;
-
-import java.util.List;
-
-final public class ExecutorMediaType {
- public static final String EXECUTOR_MEDIA_TYPE = "application/json";
-
- public static String serialize(ExecutorClass executorClass) {
- JSONObject payload = new JSONObject();
-
- payload.put("executorIp", executorClass.getExecutorIp().getValue());
- payload.put("executorPort", executorClass.getExecutorPort().getValue());
- payload.put("executorTaskType", executorClass.getExecutorTaskType().getValue());
-
- return payload.toString();
- }
-
- public static String serialize(List listOfExecutors) {
- String serializedList = "[ \n";
-
- for (ExecutorClass executor: listOfExecutors) {
- serializedList += serialize(executor) + "\n";
- }
-
- // return serializedList + "\n ]";
- JSONArray jsonArray = new JSONArray();
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("executorIp", "localhost");
- jsonArray.put(jsonObject);
- return jsonArray.toString();
- }
-
- private ExecutorMediaType() { }
-}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorInExecutorPoolByTypeWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorInExecutorPoolByTypeWebController.java
deleted file mode 100644
index 2595781..0000000
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorInExecutorPoolByTypeWebController.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package ch.unisg.executorpool.adapter.in.web;
-
-import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeQuery;
-import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeUseCase;
-import ch.unisg.executorpool.domain.ExecutorClass;
-import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.List;
-
-@RestController
-public class GetAllExecutorInExecutorPoolByTypeWebController {
- private final GetAllExecutorInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase;
-
- public GetAllExecutorInExecutorPoolByTypeWebController(GetAllExecutorInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase){
- this.getAllExecutorInExecutorPoolByTypeUseCase = getAllExecutorInExecutorPoolByTypeUseCase;
- }
-
- @GetMapping(path = "/executor-pool/GetAllExecutorInExecutorPoolByType/{taskType}")
- public ResponseEntity getAllExecutorInExecutorPoolByType(@PathVariable("taskType") String taskType){
- GetAllExecutorInExecutorPoolByTypeQuery query = new GetAllExecutorInExecutorPoolByTypeQuery(new ExecutorTaskType(taskType));
- List matchedExecutors = getAllExecutorInExecutorPoolByTypeUseCase.getAllExecutorInExecutorPoolByType(query);
-
- // Add the content type as a response header
- HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE);
-
- return new ResponseEntity<>(ExecutorMediaType.serialize(matchedExecutors), responseHeaders, HttpStatus.OK);
- }
-}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolByTypeWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolByTypeWebController.java
new file mode 100644
index 0000000..8c7ce3d
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolByTypeWebController.java
@@ -0,0 +1,36 @@
+package ch.unisg.executorpool.adapter.in.web;
+
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
+import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeQuery;
+import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeUseCase;
+import ch.unisg.executorpool.domain.ExecutorClass;
+import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+@RestController
+public class GetAllExecutorsInExecutorPoolByTypeWebController {
+ private final GetAllExecutorsInExecutorPoolByTypeUseCase getAllExecutorsInExecutorPoolByTypeUseCase;
+
+ public GetAllExecutorsInExecutorPoolByTypeWebController(GetAllExecutorsInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase){
+ this.getAllExecutorsInExecutorPoolByTypeUseCase = getAllExecutorInExecutorPoolByTypeUseCase;
+ }
+
+ @GetMapping(path = "/executor-pool/GetAllExecutorsInExecutorPoolByType/{taskType}")
+ public ResponseEntity getAllExecutorInExecutorPoolByType(@PathVariable("taskType") String taskType){
+ GetAllExecutorsInExecutorPoolByTypeQuery query = new GetAllExecutorsInExecutorPoolByTypeQuery(new ExecutorTaskType(taskType));
+ List matchedExecutors = getAllExecutorsInExecutorPoolByTypeUseCase.getAllExecutorsInExecutorPoolByType(query);
+
+ // Add the content type as a response header
+ HttpHeaders responseHeaders = new HttpHeaders();
+ responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
+
+ return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(matchedExecutors), responseHeaders, HttpStatus.OK);
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolWebController.java
index 70a5fd2..13a631a 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolWebController.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/GetAllExecutorsInExecutorPoolWebController.java
@@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolUseCase;
import ch.unisg.executorpool.domain.ExecutorClass;
import org.springframework.http.HttpHeaders;
@@ -24,8 +25,8 @@ public class GetAllExecutorsInExecutorPoolWebController {
// Add the content type as a response header
HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE);
+ responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
- return new ResponseEntity<>(ExecutorMediaType.serialize(executorClassList), responseHeaders, HttpStatus.OK);
+ return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(executorClassList), responseHeaders, HttpStatus.OK);
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/RemoveExecutorFromExecutorPoolWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/RemoveExecutorFromExecutorPoolWebController.java
index 69bbde3..28c3511 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/RemoveExecutorFromExecutorPoolWebController.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/RemoveExecutorFromExecutorPoolWebController.java
@@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase;
import ch.unisg.executorpool.domain.ExecutorClass;
@@ -11,6 +12,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
+import java.net.URI;
import java.util.Optional;
@RestController
@@ -21,9 +23,11 @@ public class RemoveExecutorFromExecutorPoolWebController {
this.removeExecutorFromExecutorPoolUseCase = removeExecutorFromExecutorPoolUseCase;
}
- @PostMapping(path = "/executor-pool/RemoveExecutor", consumes = {ExecutorMediaType.EXECUTOR_MEDIA_TYPE})
- public ResponseEntity removeExecutorFromExecutorPool(@RequestBody ExecutorClass executorClass){
- RemoveExecutorFromExecutorPoolCommand command = new RemoveExecutorFromExecutorPoolCommand(executorClass.getExecutorIp(), executorClass.getExecutorPort());
+ @PostMapping(path = "/executor-pool/RemoveExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
+ public ResponseEntity removeExecutorFromExecutorPool(@RequestBody ExecutorJsonRepresentation executorJsonRepresentation){
+ RemoveExecutorFromExecutorPoolCommand command = new RemoveExecutorFromExecutorPoolCommand(
+ new ExecutorClass.ExecutorUri(URI.create(executorJsonRepresentation.getExecutorUri()))
+ );
Optional removedExecutor = removeExecutorFromExecutorPoolUseCase.removeExecutorFromExecutorPool(command);
if(removedExecutor.isEmpty()){
@@ -31,9 +35,9 @@ public class RemoveExecutorFromExecutorPoolWebController {
}
HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE);
+ responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
- return new ResponseEntity<>(ExecutorMediaType.serialize(removedExecutor.get()), responseHeaders,
+ return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(removedExecutor.get()), responseHeaders,
HttpStatus.OK);
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java
new file mode 100644
index 0000000..323bcbb
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java
@@ -0,0 +1,43 @@
+package ch.unisg.executorpool.adapter.out.messaging;
+
+import ch.unisg.common.ConfigProperties;
+import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
+import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Primary;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import java.net.URI;
+
+@Component
+@Primary
+public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort {
+
+ private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
+
+ // TODO Can't autowire. Find fix
+ /*
+ @Autowired
+ private ConfigProperties config;
+ */
+
+ @Autowired
+ private Environment environment;
+
+ @Override
+ public void publishExecutorAddedEvent(ExecutorAddedEvent event){
+ try{
+ var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
+ mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
+ }
+ catch (MqttException e){
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorRemovedEventAdapter.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorRemovedEventAdapter.java
new file mode 100644
index 0000000..aa01165
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorRemovedEventAdapter.java
@@ -0,0 +1,41 @@
+package ch.unisg.executorpool.adapter.out.messaging;
+
+import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
+import ch.unisg.executorpool.application.port.out.ExecutorRemovedEventPort;
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
+import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Primary;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+@Component
+@Primary
+public class PublishExecutorRemovedEventAdapter implements ExecutorRemovedEventPort {
+
+ private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
+
+ // TODO Can't autowire. Find fix
+ /*
+ @Autowired
+ private ConfigProperties config;
+ */
+
+ @Autowired
+ private Environment environment;
+
+ @Override
+ public void publishExecutorRemovedEvent(ExecutorRemovedEvent event){
+ try{
+ var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
+ mqttClient.publishMessage("ch/unisg/tapas/executors/removed", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
+ }
+ catch (MqttException e){
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/AddNewExecutorToExecutorPoolCommand.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/AddNewExecutorToExecutorPoolCommand.java
index 2682610..ddd7da9 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/AddNewExecutorToExecutorPoolCommand.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/AddNewExecutorToExecutorPoolCommand.java
@@ -2,8 +2,7 @@ package ch.unisg.executorpool.application.port.in;
import ch.unisg.common.SelfValidating;
import ch.unisg.executorpool.domain.ExecutorPool;
-import ch.unisg.executorpool.domain.ExecutorClass.ExecutorIp;
-import ch.unisg.executorpool.domain.ExecutorClass.ExecutorPort;
+import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
import lombok.Value;
import javax.validation.constraints.NotNull;
@@ -11,17 +10,13 @@ import javax.validation.constraints.NotNull;
@Value
public class AddNewExecutorToExecutorPoolCommand extends SelfValidating {
@NotNull
- private final ExecutorIp executorIp;
-
- @NotNull
- private final ExecutorPort executorPort;
+ private final ExecutorUri executorUri;
@NotNull
private final ExecutorTaskType executorTaskType;
- public AddNewExecutorToExecutorPoolCommand(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){
- this.executorIp = executorIp;
- this.executorPort = executorPort;
+ public AddNewExecutorToExecutorPoolCommand(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
+ this.executorUri = executorUri;
this.executorTaskType = executorTaskType;
this.validateSelf();
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeUseCase.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeUseCase.java
deleted file mode 100644
index 9f612bf..0000000
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeUseCase.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package ch.unisg.executorpool.application.port.in;
-
-import ch.unisg.executorpool.domain.ExecutorClass;
-
-import java.util.List;
-
-public interface GetAllExecutorInExecutorPoolByTypeUseCase {
- List getAllExecutorInExecutorPoolByType(GetAllExecutorInExecutorPoolByTypeQuery query);
-}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeQuery.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeQuery.java
similarity index 64%
rename from executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeQuery.java
rename to executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeQuery.java
index c812eab..079e7e1 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorInExecutorPoolByTypeQuery.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeQuery.java
@@ -7,11 +7,11 @@ import lombok.Value;
import javax.validation.constraints.NotNull;
@Value
-public class GetAllExecutorInExecutorPoolByTypeQuery extends SelfValidating {
+public class GetAllExecutorsInExecutorPoolByTypeQuery extends SelfValidating {
@NotNull
private final ExecutorTaskType executorTaskType;
- public GetAllExecutorInExecutorPoolByTypeQuery(ExecutorTaskType executorTaskType){
+ public GetAllExecutorsInExecutorPoolByTypeQuery(ExecutorTaskType executorTaskType){
this.executorTaskType = executorTaskType;
this.validateSelf();
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeUseCase.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeUseCase.java
new file mode 100644
index 0000000..4821284
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/GetAllExecutorsInExecutorPoolByTypeUseCase.java
@@ -0,0 +1,9 @@
+package ch.unisg.executorpool.application.port.in;
+
+import ch.unisg.executorpool.domain.ExecutorClass;
+
+import java.util.List;
+
+public interface GetAllExecutorsInExecutorPoolByTypeUseCase {
+ List getAllExecutorsInExecutorPoolByType(GetAllExecutorsInExecutorPoolByTypeQuery query);
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/RemoveExecutorFromExecutorPoolCommand.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/RemoveExecutorFromExecutorPoolCommand.java
index 11763a9..162426c 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/RemoveExecutorFromExecutorPoolCommand.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/in/RemoveExecutorFromExecutorPoolCommand.java
@@ -1,9 +1,7 @@
package ch.unisg.executorpool.application.port.in;
-import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.common.SelfValidating;
-import ch.unisg.executorpool.domain.ExecutorClass.ExecutorIp;
-import ch.unisg.executorpool.domain.ExecutorClass.ExecutorPort;
+import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
import lombok.Value;
import javax.validation.constraints.NotNull;
@@ -11,14 +9,10 @@ import javax.validation.constraints.NotNull;
@Value
public class RemoveExecutorFromExecutorPoolCommand extends SelfValidating {
@NotNull
- private final ExecutorIp executorIp;
+ private final ExecutorUri executorUri;
- @NotNull
- private final ExecutorPort executorPort;
-
- public RemoveExecutorFromExecutorPoolCommand(ExecutorIp executorIp, ExecutorPort executorPort){
- this.executorIp = executorIp;
- this.executorPort = executorPort;
+ public RemoveExecutorFromExecutorPoolCommand(ExecutorUri executorUri){
+ this.executorUri = executorUri;
this.validateSelf();
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java
new file mode 100644
index 0000000..ad75c75
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java
@@ -0,0 +1,8 @@
+package ch.unisg.executorpool.application.port.out;
+
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+public interface ExecutorAddedEventPort {
+ void publishExecutorAddedEvent(ExecutorAddedEvent event);
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorRemovedEventPort.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorRemovedEventPort.java
new file mode 100644
index 0000000..b905858
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorRemovedEventPort.java
@@ -0,0 +1,7 @@
+package ch.unisg.executorpool.application.port.out;
+
+import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
+
+public interface ExecutorRemovedEventPort {
+ void publishExecutorRemovedEvent(ExecutorRemovedEvent event);
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
index e1ef237..393024a 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
@@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
+import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
-import org.yaml.snakeyaml.constructor.DuplicateKeyException;
import javax.transaction.Transactional;
-import javax.validation.ConstraintViolationException;
-@RequiredArgsConstructor
@Component
@Transactional
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
+ private final ExecutorAddedEventPort executorAddedEventPort;
+
+ public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){
+ this.executorAddedEventPort = executorAddedEventPort;
+ }
+
@Override
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
+ var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
- return executorPool.addNewExecutor(command.getExecutorIp(), command.getExecutorPort(), command.getExecutorTaskType());
+ var executorAddedEvent = new ExecutorAddedEvent(newExecutor);
+ executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent);
+
+ return newExecutor;
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorInExecutorPoolByTypeService.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorsInExecutorPoolByTypeService.java
similarity index 56%
rename from executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorInExecutorPoolByTypeService.java
rename to executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorsInExecutorPoolByTypeService.java
index 74988b2..00d1636 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorInExecutorPoolByTypeService.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/GetAllExecutorsInExecutorPoolByTypeService.java
@@ -1,7 +1,7 @@
package ch.unisg.executorpool.application.service;
-import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeQuery;
-import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeUseCase;
+import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeQuery;
+import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeUseCase;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor;
@@ -13,10 +13,10 @@ import java.util.List;
@RequiredArgsConstructor
@Component
@Transactional
-public class GetAllExecutorInExecutorPoolByTypeService implements GetAllExecutorInExecutorPoolByTypeUseCase {
+public class GetAllExecutorsInExecutorPoolByTypeService implements GetAllExecutorsInExecutorPoolByTypeUseCase {
@Override
- public List getAllExecutorInExecutorPoolByType(GetAllExecutorInExecutorPoolByTypeQuery query){
+ public List getAllExecutorsInExecutorPoolByType(GetAllExecutorsInExecutorPoolByTypeQuery query){
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
return executorPool.getAllExecutorsByType(query.getExecutorTaskType());
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/RemoveExecutorFromExecutorPoolService.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/RemoveExecutorFromExecutorPoolService.java
index 639ba7f..4d2457d 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/RemoveExecutorFromExecutorPoolService.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/RemoveExecutorFromExecutorPoolService.java
@@ -2,21 +2,36 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase;
+import ch.unisg.executorpool.application.port.out.ExecutorRemovedEventPort;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool;
+import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import javax.transaction.Transactional;
import java.util.Optional;
-@RequiredArgsConstructor
@Component
@Transactional
public class RemoveExecutorFromExecutorPoolService implements RemoveExecutorFromExecutorPoolUseCase {
+
+ private final ExecutorRemovedEventPort executorRemovedEventPort;
+
+ public RemoveExecutorFromExecutorPoolService(ExecutorRemovedEventPort executorRemovedEventPort){
+ this.executorRemovedEventPort = executorRemovedEventPort;
+ }
+
@Override
public Optional removeExecutorFromExecutorPool(RemoveExecutorFromExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
- return executorPool.removeExecutorByIpAndPort(command.getExecutorIp(), command.getExecutorPort());
+ var removedExecutor = executorPool.removeExecutorByIpAndPort(command.getExecutorUri());
+
+ if(removedExecutor.isPresent()){
+ var executorRemovedEvent = new ExecutorRemovedEvent(removedExecutor.get());
+ executorRemovedEventPort.publishExecutorRemovedEvent(executorRemovedEvent);
+ }
+
+ return removedExecutor;
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java
new file mode 100644
index 0000000..6ec291e
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java
@@ -0,0 +1,10 @@
+package ch.unisg.executorpool.domain;
+
+import lombok.Getter;
+
+public class ExecutorAddedEvent {
+ @Getter
+ private ExecutorClass executorClass;
+
+ public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorClass.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorClass.java
index d1fca00..5da6fe7 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorClass.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorClass.java
@@ -3,36 +3,29 @@ package ch.unisg.executorpool.domain;
import lombok.Getter;
import lombok.Value;
+import java.net.URI;
+
public class ExecutorClass {
@Getter
- private final ExecutorIp executorIp;
-
- @Getter
- private final ExecutorPort executorPort;
+ private final ExecutorUri executorUri;
@Getter
private final ExecutorTaskType executorTaskType;
- public ExecutorClass(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){
- this.executorIp = executorIp;
- this.executorPort = executorPort;
+ public ExecutorClass(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
+ this.executorUri = executorUri;
this.executorTaskType = executorTaskType;
}
- protected static ExecutorClass createExecutorClass(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){
- System.out.println("New Task: " + executorIp.getValue() + " " + executorPort.getValue() + " " + executorTaskType.getValue());
- return new ExecutorClass(executorIp, executorPort, executorTaskType);
+ protected static ExecutorClass createExecutorClass(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
+ System.out.println("New Executor: " + executorUri.value.toString() + " " + executorTaskType.getValue());
+ return new ExecutorClass(executorUri, executorTaskType);
}
@Value
- public static class ExecutorIp {
- private String value;
- }
-
- @Value
- public static class ExecutorPort {
- private String value;
+ public static class ExecutorUri {
+ private URI value;
}
@Value
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorPool.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorPool.java
index dd5375b..0ca0d5e 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorPool.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorPool.java
@@ -1,5 +1,8 @@
package ch.unisg.executorpool.domain;
+import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
+import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
+
import lombok.Getter;
import lombok.Value;
@@ -20,19 +23,17 @@ public class ExecutorPool {
public static ExecutorPool getExecutorPool() { return executorPool; }
- public ExecutorClass addNewExecutor(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort, ExecutorClass.ExecutorTaskType executorTaskType){
- ExecutorClass newExecutor = ExecutorClass.createExecutorClass(executorIp, executorPort, executorTaskType);
+ public ExecutorClass addNewExecutor(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
+ ExecutorClass newExecutor = ExecutorClass.createExecutorClass(executorUri, executorTaskType);
listOfExecutors.value.add(newExecutor);
System.out.println("Number of executors: " + listOfExecutors.value.size());
return newExecutor;
}
- public Optional getExecutorByIpAndPort(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort){
+ public Optional getExecutorByUri(ExecutorUri executorUri){
for (ExecutorClass executor : listOfExecutors.value ) {
- // TODO can this be simplified by overwriting equals()?
- if(executor.getExecutorIp().getValue().equalsIgnoreCase(executorIp.getValue()) &&
- executor.getExecutorPort().getValue().equalsIgnoreCase(executorPort.getValue())){
+ if(executor.getExecutorUri().getValue().equals(executorUri)){
return Optional.of(executor);
}
}
@@ -54,11 +55,10 @@ public class ExecutorPool {
return matchedExecutors;
}
- public Optional removeExecutorByIpAndPort(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort){
+ public Optional removeExecutorByIpAndPort(ExecutorUri executorUri){
for (ExecutorClass executor : listOfExecutors.value ) {
// TODO can this be simplified by overwriting equals()?
- if(executor.getExecutorIp().getValue().equalsIgnoreCase(executorIp.getValue()) &&
- executor.getExecutorPort().getValue().equalsIgnoreCase(executorPort.getValue())){
+ if(executor.getExecutorUri().getValue().equals(executorUri.getValue())){
listOfExecutors.value.remove(executor);
return Optional.of(executor);
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorRemovedEvent.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorRemovedEvent.java
new file mode 100644
index 0000000..a038928
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorRemovedEvent.java
@@ -0,0 +1,11 @@
+package ch.unisg.executorpool.domain;
+
+import ch.unisg.executorpool.domain.ExecutorClass;
+import lombok.Getter;
+
+public class ExecutorRemovedEvent {
+ @Getter
+ private ExecutorClass executorClass;
+
+ public ExecutorRemovedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
+}
diff --git a/executor-pool/src/main/resources/application.properties b/executor-pool/src/main/resources/application.properties
index 8f91ca7..0c9ba7e 100644
--- a/executor-pool/src/main/resources/application.properties
+++ b/executor-pool/src/main/resources/application.properties
@@ -1 +1,3 @@
server.port=8083
+
+mqtt.broker.uri=tcp://localhost:1883
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/TapasAuctionHouseApplication.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/TapasAuctionHouseApplication.java
index 1f958d9..7438032 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/TapasAuctionHouseApplication.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/TapasAuctionHouseApplication.java
@@ -4,10 +4,12 @@ import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
import ch.unisg.tapas.auctionhouse.adapter.common.clients.WebSubSubscriber;
import ch.unisg.tapas.common.AuctionHouseResourceDirectory;
+import ch.unisg.tapas.common.ConfigProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -21,15 +23,18 @@ import java.util.List;
public class TapasAuctionHouseApplication {
private static final Logger LOGGER = LogManager.getLogger(TapasAuctionHouseApplication.class);
+ @Autowired
+ private ConfigProperties config;
+
public static String RESOURCE_DIRECTORY = "https://api.interactions.ics.unisg.ch/auction-houses/";
- public static String MQTT_BROKER = "tcp://broker.hivemq.com:1883";
+ public static String MQTT_BROKER = "tcp://localhost:1883";
public static void main(String[] args) {
SpringApplication tapasAuctioneerApp = new SpringApplication(TapasAuctionHouseApplication.class);
// We will use these bootstrap methods in Week 6:
bootstrapMarketplaceWithWebSub();
- // bootstrapMarketplaceWithMqtt();
+ bootstrapMarketplaceWithMqtt();
tapasAuctioneerApp.run(args);
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/clients/TapasMqttClient.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/clients/TapasMqttClient.java
index 708d512..1a30bc4 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/clients/TapasMqttClient.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/clients/TapasMqttClient.java
@@ -68,7 +68,10 @@ public class TapasMqttClient {
mqttClient.subscribe(topic);
}
- private void publishMessage(String topic, String payload) throws MqttException {
+ public void publishMessage(String topic, String payload) throws MqttException {
+ mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
+ mqttClient.connect();
+
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttClient.publish(topic, message);
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/formats/AuctionJsonRepresentation.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/formats/AuctionJsonRepresentation.java
index 4500423..ea4cf2c 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/formats/AuctionJsonRepresentation.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/common/formats/AuctionJsonRepresentation.java
@@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
+import java.sql.Timestamp;
+
/**
* Used to expose a representation of the state of an auction through an interface. This class is
* only meant as a starting point when defining a uniform HTTP API for the Auction House: feel free
@@ -28,12 +30,12 @@ public class AuctionJsonRepresentation {
private String taskType;
@Getter @Setter
- private Integer deadline;
+ private Timestamp deadline;
public AuctionJsonRepresentation() { }
public AuctionJsonRepresentation(String auctionId, String auctionHouseUri, String taskUri,
- String taskType, Integer deadline) {
+ String taskType, Timestamp deadline) {
this.auctionId = auctionId;
this.auctionHouseUri = auctionHouseUri;
this.taskUri = taskUri;
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorAddedEventListenerHttpAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorAddedEventListenerHttpAdapter.java
deleted file mode 100644
index 3511b7d..0000000
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorAddedEventListenerHttpAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
-
-import ch.unisg.tapas.auctionhouse.application.handler.ExecutorAddedHandler;
-import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorAddedEvent;
-import ch.unisg.tapas.auctionhouse.domain.Auction;
-import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * Template for receiving an executor added event via HTTP
- */
-@RestController
-public class ExecutorAddedEventListenerHttpAdapter {
-
- @PostMapping(path = "/executors/{taskType}/{executorId}")
- public ResponseEntity handleExecutorAddedEvent(@PathVariable("taskType") String taskType,
- @PathVariable("executorId") String executorId) {
-
- ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
- new ExecutorRegistry.ExecutorIdentifier(executorId),
- new Auction.AuctionedTaskType(taskType)
- );
-
- ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();
- newExecutorHandler.handleNewExecutorEvent(executorAddedEvent);
-
- return new ResponseEntity<>(HttpStatus.NO_CONTENT);
- }
-}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorRemovedEventListenerHttpAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorRemovedEventListenerHttpAdapter.java
deleted file mode 100644
index 53811f9..0000000
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/http/ExecutorRemovedEventListenerHttpAdapter.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
-
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * Template for handling an executor removed event received via an HTTP request
- */
-@RestController
-public class ExecutorRemovedEventListenerHttpAdapter {
-
- // TODO: add annotations for request method, request URI, etc.
- public void handleExecutorRemovedEvent(@PathVariable("executorId") String executorId) {
- // TODO: implement logic
- }
-}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/AuctionEventsMqttDispatcher.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/AuctionEventsMqttDispatcher.java
index e5eaf12..3e55d5e 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/AuctionEventsMqttDispatcher.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/AuctionEventsMqttDispatcher.java
@@ -26,7 +26,7 @@ public class AuctionEventsMqttDispatcher {
// TODO: Register here your topics and event listener adapters
private void initRouter() {
- router.put("ch/unisg/tapas-group-tutors/executors", new ExecutorAddedEventListenerMqttAdapter());
+ router.put("ch/unisg/tapas/executors/added", new ExecutorAddedEventListenerMqttAdapter());
}
/**
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorAddedEventListenerMqttAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorAddedEventListenerMqttAdapter.java
index 2f661d1..dd2d120 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorAddedEventListenerMqttAdapter.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorAddedEventListenerMqttAdapter.java
@@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import java.net.URI;
+
/**
* Listener that handles events when an executor was added to this TAPAS application.
*
@@ -24,16 +26,16 @@ public class ExecutorAddedEventListenerMqttAdapter extends AuctionEventMqttListe
String payload = new String(message.getPayload());
try {
- // Note: this messge representation is provided only as an example. You should use a
+ // Note: this message representation is provided only as an example. You should use a
// representation that makes sense in the context of your application.
JsonNode data = new ObjectMapper().readTree(payload);
- String taskType = data.get("taskType").asText();
- String executorId = data.get("executorId").asText();
+ String executorUri = data.get("executorUri").asText();
+ String executorTaskType = data.get("executorTaskType").asText();
ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
- new ExecutorRegistry.ExecutorIdentifier(executorId),
- new Auction.AuctionedTaskType(taskType)
+ new ExecutorRegistry.ExecutorUri(URI.create(executorUri)),
+ new Auction.AuctionedTaskType(executorTaskType)
);
ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorRemovedEventListenerMqttAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorRemovedEventListenerMqttAdapter.java
new file mode 100644
index 0000000..087479c
--- /dev/null
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/in/messaging/mqtt/ExecutorRemovedEventListenerMqttAdapter.java
@@ -0,0 +1,46 @@
+package ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt;
+
+import ch.unisg.tapas.auctionhouse.application.handler.ExecutorRemovedHandler;
+import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorRemovedEvent;
+import ch.unisg.tapas.auctionhouse.domain.Auction;
+import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Listener that handles events when an executor was removed to this TAPAS application.
+ *
+ * This class is only provided as an example to help you bootstrap the project.
+ */
+public class ExecutorRemovedEventListenerMqttAdapter extends AuctionEventMqttListener {
+ private static final Logger LOGGER = LogManager.getLogger(ExecutorRemovedEventListenerMqttAdapter.class);
+
+ @Override
+ public boolean handleEvent(MqttMessage message) {
+ String payload = new String(message.getPayload());
+
+ try {
+ // Note: this messge representation is provided only as an example. You should use a
+ // representation that makes sense in the context of your application.
+ JsonNode data = new ObjectMapper().readTree(payload);
+
+ String executorId = data.get("executorId").asText();
+
+ ExecutorRemovedEvent executorRemovedEvent = new ExecutorRemovedEvent(
+ new ExecutorRegistry.ExecutorIdentifier(executorId)
+ );
+
+ ExecutorRemovedHandler newExecutorHandler = new ExecutorRemovedHandler();
+ newExecutorHandler.handleNewExecutorEvent(executorRemovedEvent);
+ } catch (JsonProcessingException | NullPointerException e) {
+ LOGGER.error(e.getMessage(), e);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventMqttAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventMqttAdapter.java
new file mode 100644
index 0000000..d5bb0fc
--- /dev/null
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventMqttAdapter.java
@@ -0,0 +1,36 @@
+package ch.unisg.tapas.auctionhouse.adapter.out.messaging.websub;
+
+import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
+import ch.unisg.tapas.auctionhouse.adapter.common.formats.AuctionJsonRepresentation;
+import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
+import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
+import ch.unisg.tapas.auctionhouse.domain.AuctionStartedEvent;
+import ch.unisg.tapas.common.ConfigProperties;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Component;
+
+@Component
+@Primary
+public class PublishAuctionStartedEventMqttAdapter implements AuctionStartedEventPort {
+
+ private static final Logger LOGGER = LogManager.getLogger(PublishAuctionStartedEventMqttAdapter.class);
+
+ @Autowired
+ private ConfigProperties config;
+
+ @Override
+ public void publishAuctionStartedEvent(AuctionStartedEvent event) {
+ try{
+ var mqttClient = TapasMqttClient.getInstance(config.getMqttBrokerUri().toString(), new AuctionEventsMqttDispatcher());
+ mqttClient.publishMessage("ch/unisg/tapas/auctions", AuctionJsonRepresentation.serialize(event.getAuction()));
+ }
+ catch (MqttException | JsonProcessingException e){
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventWebSubAdapter.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventWebSubAdapter.java
index 01350d3..228f43b 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventWebSubAdapter.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/adapter/out/messaging/websub/PublishAuctionStartedEventWebSubAdapter.java
@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
* This class is a template for publishing auction started events via WebSub.
*/
@Component
-@Primary
public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEventPort {
// You can use this object to retrieve properties from application.properties, e.g. the
// WebSub hub publish endpoint, etc.
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorAddedHandler.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorAddedHandler.java
index 624e669..fc30e11 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorAddedHandler.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorAddedHandler.java
@@ -11,6 +11,6 @@ public class ExecutorAddedHandler implements ExecutorAddedEventHandler {
@Override
public boolean handleNewExecutorEvent(ExecutorAddedEvent executorAddedEvent) {
return ExecutorRegistry.getInstance().addExecutor(executorAddedEvent.getTaskType(),
- executorAddedEvent.getExecutorId());
+ executorAddedEvent.getExecutorUri());
}
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorRemovedHandler.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorRemovedHandler.java
index c3bfed8..f63950d 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorRemovedHandler.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/handler/ExecutorRemovedHandler.java
@@ -14,6 +14,9 @@ public class ExecutorRemovedHandler implements ExecutorRemovedEventHandler {
@Override
public boolean handleExecutorRemovedEvent(ExecutorRemovedEvent executorRemovedEvent) {
- return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorId());
+ return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorUri());
+ }
+
+ public void handleNewExecutorEvent(ExecutorRemovedEvent executorRemovedEvent) {
}
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorAddedEvent.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorAddedEvent.java
index 5a53b94..7d647e1 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorAddedEvent.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorAddedEvent.java
@@ -1,7 +1,8 @@
package ch.unisg.tapas.auctionhouse.application.port.in;
import ch.unisg.tapas.auctionhouse.domain.Auction.AuctionedTaskType;
-import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier;
+import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
+import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating;
import lombok.Value;
@@ -13,7 +14,7 @@ import javax.validation.constraints.NotNull;
@Value
public class ExecutorAddedEvent extends SelfValidating {
@NotNull
- private final ExecutorIdentifier executorId;
+ private final ExecutorRegistry.ExecutorUri executorUri;
@NotNull
private final AuctionedTaskType taskType;
@@ -21,10 +22,10 @@ public class ExecutorAddedEvent extends SelfValidating {
/**
* Constructs an executor added event.
*
- * @param executorId the identifier of the executor that was added to this TAPAS application
+ * @param executorUri the identifier of the executor that was added to this TAPAS application
*/
- public ExecutorAddedEvent(ExecutorIdentifier executorId, AuctionedTaskType taskType) {
- this.executorId = executorId;
+ public ExecutorAddedEvent(ExecutorUri executorUri, AuctionedTaskType taskType) {
+ this.executorUri = executorUri;
this.taskType = taskType;
this.validateSelf();
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorRemovedEvent.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorRemovedEvent.java
index 4d5c910..a1633fe 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorRemovedEvent.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/port/in/ExecutorRemovedEvent.java
@@ -1,6 +1,7 @@
package ch.unisg.tapas.auctionhouse.application.port.in;
-import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier;
+import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
+import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating;
import lombok.Value;
@@ -12,15 +13,15 @@ import javax.validation.constraints.NotNull;
@Value
public class ExecutorRemovedEvent extends SelfValidating {
@NotNull
- private final ExecutorIdentifier executorId;
+ private final ExecutorUri executorUri;
/**
* Constructs an executor removed event.
*
- * @param executorId the identifier of the executor that was removed from this TAPAS application
+ * @param executorUri
*/
- public ExecutorRemovedEvent(ExecutorIdentifier executorId) {
- this.executorId = executorId;
+ public ExecutorRemovedEvent(ExecutorUri executorUri) {
+ this.executorUri = executorUri;
this.validateSelf();
}
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/service/StartAuctionService.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/service/StartAuctionService.java
index 42c6e37..60c5f24 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/service/StartAuctionService.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/application/service/StartAuctionService.java
@@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.application.service;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionCommand;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
+import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionWonEventPort;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
import ch.unisg.tapas.auctionhouse.domain.*;
@@ -11,6 +12,7 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +27,7 @@ import java.util.concurrent.TimeUnit;
public class StartAuctionService implements LaunchAuctionUseCase {
private static final Logger LOGGER = LogManager.getLogger(StartAuctionService.class);
- private final static int DEFAULT_AUCTION_DEADLINE_MILLIS = 10000;
+ private final Timestamp DEFAULT_AUCTION_DEADLINE_MILLIS = Timestamp.valueOf("1970-01-01 00:00:01");
// Event port used to publish an auction started event
private final AuctionStartedEventPort auctionStartedEventPort;
@@ -63,7 +65,7 @@ public class StartAuctionService implements LaunchAuctionUseCase {
auctions.addAuction(auction);
// Schedule the closing of the auction at the deadline
- service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue(),
+ service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue().getTime() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
// Publish an auction started event
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/Auction.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/Auction.java
index 3e51ef7..c6d9333 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/Auction.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/Auction.java
@@ -4,6 +4,7 @@ import lombok.Getter;
import lombok.Value;
import java.net.URI;
+import java.sql.Timestamp;
import java.util.*;
/**
@@ -166,6 +167,6 @@ public class Auction {
@Value
public static class AuctionDeadline {
- int value;
+ Timestamp value;
}
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/ExecutorRegistry.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/ExecutorRegistry.java
index 9da3756..1aedc80 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/ExecutorRegistry.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/auctionhouse/domain/ExecutorRegistry.java
@@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.domain;
import lombok.Value;
+import java.net.URI;
import java.util.*;
/**
@@ -13,7 +14,7 @@ import java.util.*;
public class ExecutorRegistry {
private static ExecutorRegistry registry;
- private final Map> executors;
+ private final Map> executors;
private ExecutorRegistry() {
this.executors = new Hashtable<>();
@@ -31,14 +32,14 @@ public class ExecutorRegistry {
* Adds an executor to the registry for a given task type.
*
* @param taskType the type of the task
- * @param executorIdentifier the identifier of the executor (can be any string)
+ * @param executorUri the executor's URI
* @return true unless a runtime exception occurs
*/
- public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorIdentifier executorIdentifier) {
- Set taskTypeExecs = executors.getOrDefault(taskType,
+ public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorUri executorUri) {
+ Set taskTypeExecs = executors.getOrDefault(taskType,
Collections.synchronizedSet(new HashSet<>()));
- taskTypeExecs.add(executorIdentifier);
+ taskTypeExecs.add(executorUri);
executors.put(taskType, taskTypeExecs);
return true;
@@ -47,17 +48,17 @@ public class ExecutorRegistry {
/**
* Removes an executor from the registry. The executor is disassociated from all known task types.
*
- * @param executorIdentifier the identifier of the executor (can be any string)
+ * @param executorUri the executor's URI
* @return true unless a runtime exception occurs
*/
- public boolean removeExecutor(ExecutorIdentifier executorIdentifier) {
+ public boolean removeExecutor(ExecutorUri executorUri) {
Iterator iterator = executors.keySet().iterator();
while (iterator.hasNext()) {
Auction.AuctionedTaskType taskType = iterator.next();
- Set set = executors.get(taskType);
+ Set set = executors.get(taskType);
- set.remove(executorIdentifier);
+ set.remove(executorUri);
if (set.isEmpty()) {
iterator.remove();
@@ -80,7 +81,7 @@ public class ExecutorRegistry {
// Value Object for the executor identifier
@Value
- public static class ExecutorIdentifier {
- String value;
+ public static class ExecutorUri {
+ URI value;
}
}
diff --git a/tapas-auction-house/src/main/java/ch/unisg/tapas/common/ConfigProperties.java b/tapas-auction-house/src/main/java/ch/unisg/tapas/common/ConfigProperties.java
index 748afda..2933465 100644
--- a/tapas-auction-house/src/main/java/ch/unisg/tapas/common/ConfigProperties.java
+++ b/tapas-auction-house/src/main/java/ch/unisg/tapas/common/ConfigProperties.java
@@ -61,4 +61,14 @@ public class ConfigProperties {
public URI getTaskListUri() {
return URI.create(environment.getProperty("tasks.list.uri"));
}
+
+
+ /**
+ * Retrieves the URI of the MQTT broker.
+ *
+ * @return the URI of the MQTT broker
+ */
+ public URI getMqttBrokerUri() {
+ return URI.create(environment.getProperty("mqtt.broker.uri"));
+ }
}
diff --git a/tapas-auction-house/src/main/resources/application.properties b/tapas-auction-house/src/main/resources/application.properties
index 96e231c..706362e 100644
--- a/tapas-auction-house/src/main/resources/application.properties
+++ b/tapas-auction-house/src/main/resources/application.properties
@@ -10,3 +10,4 @@ tasks.list.uri=https://tapas-tasks.86-119-34-23.nip.io/
application.environment=development
auctionhouse.uri=http://localhost:8086
websub.hub.uri=http://localhost:3000
+mqtt.broker.uri=tcp://localhost:1883
diff --git a/tapas-tasks/src/main/java/ch/unisg/tapastasks/tasks/application/service/DeleteTaskService.java b/tapas-tasks/src/main/java/ch/unisg/tapastasks/tasks/application/service/DeleteTaskService.java
index f865f4c..35685a3 100644
--- a/tapas-tasks/src/main/java/ch/unisg/tapastasks/tasks/application/service/DeleteTaskService.java
+++ b/tapas-tasks/src/main/java/ch/unisg/tapastasks/tasks/application/service/DeleteTaskService.java
@@ -5,6 +5,7 @@ import ch.unisg.tapastasks.tasks.application.port.in.DeleteTaskCommand;
import ch.unisg.tapastasks.tasks.application.port.in.DeleteTaskUseCase;
import ch.unisg.tapastasks.tasks.domain.Task;
import ch.unisg.tapastasks.tasks.domain.TaskList;
+import jdk.jshell.spi.ExecutionControl;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@@ -23,11 +24,10 @@ public class DeleteTaskService implements DeleteTaskUseCase {
Optional updatedTask = taskList.retrieveTaskById(command.getTaskId());
Task newTask = updatedTask.get();
// TODO: Fill in the right condition into the if-statement and the else-statement
- if (/*the task can be deleted*/){
+ if (true){
return taskList.deleteTaskById(command.getTaskId());
- } else {
- /*send message back to TaskList that the task cannot be deleted*/
}
-
+ // TODO Handle with a return message
+ return Optional.empty();
}
}