diff --git a/executor-pool/pom.xml b/executor-pool/pom.xml
index 2e75dcc..512235d 100644
--- a/executor-pool/pom.xml
+++ b/executor-pool/pom.xml
@@ -63,6 +63,12 @@
javax.transaction-api
compile
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+ compile
+
diff --git a/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java
new file mode 100644
index 0000000..b46bf63
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java
@@ -0,0 +1,23 @@
+package ch.unisg.common;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import java.net.URI;
+
+@Component
+public class ConfigProperties {
+ @Autowired
+ private Environment environment;
+
+ /**
+ * Retrieves the URI of the WebSub hub. In this project, we use a single WebSub hub, but we could
+ * use multiple.
+ *
+ * @return the URI of the WebSub hub
+ */
+ public URI getMqttBrokerUri() {
+ return URI.create(environment.getProperty("mqtt.broker.uri"));
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java b/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java
deleted file mode 100644
index ca29e09..0000000
--- a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package ch.unisg.executorpool;
-
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-@RestController
-public class TestController {
- @RequestMapping("/")
- public String index() {
- return "Hello World! Executor Pool";
- }
-}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java
new file mode 100644
index 0000000..0b24b81
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java
@@ -0,0 +1,41 @@
+package ch.unisg.executorpool.adapter.common.clients;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+public class TapasMqttClient {
+ private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class);
+
+ private static TapasMqttClient tapasClient = null;
+
+ private MqttClient mqttClient;
+ private final String mqttClientId;
+ private final String brokerAddress;
+
+ private TapasMqttClient(String brokerAddress) {
+ this.mqttClientId = UUID.randomUUID().toString();
+ this.brokerAddress = brokerAddress;
+ }
+
+ public static synchronized TapasMqttClient getInstance(String brokerAddress) {
+
+ if (tapasClient == null) {
+ tapasClient = new TapasMqttClient(brokerAddress);
+ }
+
+ return tapasClient;
+ }
+
+ public void publishMessage(String topic, String payload) throws MqttException {
+ mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
+ mqttClient.connect();
+ MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
+ mqttClient.publish(topic, message);
+ mqttClient.disconnect();
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
index 5a2dc09..ff464d3 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java
@@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web;
+import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
@@ -13,6 +14,10 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import javax.validation.ConstraintViolationException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.eclipse.paho.client.mqttv3.*;
@RestController
public class AddNewExecutorToExecutorPoolWebController {
@@ -24,7 +29,7 @@ public class AddNewExecutorToExecutorPoolWebController {
@PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
public ResponseEntity addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){
- try{
+ try {
AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand(
new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())),
new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType())
@@ -36,6 +41,7 @@ public class AddNewExecutorToExecutorPoolWebController {
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
+
} catch (ConstraintViolationException e){
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java
new file mode 100644
index 0000000..323bcbb
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java
@@ -0,0 +1,43 @@
+package ch.unisg.executorpool.adapter.out.messaging;
+
+import ch.unisg.common.ConfigProperties;
+import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
+import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
+import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Primary;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import java.net.URI;
+
+@Component
+@Primary
+public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort {
+
+ private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
+
+ // TODO Can't autowire. Find fix
+ /*
+ @Autowired
+ private ConfigProperties config;
+ */
+
+ @Autowired
+ private Environment environment;
+
+ @Override
+ public void publishExecutorAddedEvent(ExecutorAddedEvent event){
+ try{
+ var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
+ mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
+ }
+ catch (MqttException e){
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java
new file mode 100644
index 0000000..ad75c75
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java
@@ -0,0 +1,8 @@
+package ch.unisg.executorpool.application.port.out;
+
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+public interface ExecutorAddedEventPort {
+ void publishExecutorAddedEvent(ExecutorAddedEvent event);
+}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
index 200739b..393024a 100644
--- a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java
@@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
+import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
+import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
-import org.yaml.snakeyaml.constructor.DuplicateKeyException;
import javax.transaction.Transactional;
-import javax.validation.ConstraintViolationException;
-@RequiredArgsConstructor
@Component
@Transactional
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
+ private final ExecutorAddedEventPort executorAddedEventPort;
+
+ public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){
+ this.executorAddedEventPort = executorAddedEventPort;
+ }
+
@Override
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
+ var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
- return executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
+ var executorAddedEvent = new ExecutorAddedEvent(newExecutor);
+ executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent);
+
+ return newExecutor;
}
}
diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java
new file mode 100644
index 0000000..6ec291e
--- /dev/null
+++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java
@@ -0,0 +1,10 @@
+package ch.unisg.executorpool.domain;
+
+import lombok.Getter;
+
+public class ExecutorAddedEvent {
+ @Getter
+ private ExecutorClass executorClass;
+
+ public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
+}
diff --git a/executor-pool/src/main/resources/application.properties b/executor-pool/src/main/resources/application.properties
index 8f91ca7..0c9ba7e 100644
--- a/executor-pool/src/main/resources/application.properties
+++ b/executor-pool/src/main/resources/application.properties
@@ -1 +1,3 @@
server.port=8083
+
+mqtt.broker.uri=tcp://localhost:1883