From 55c094fc56db6abc9c641fd45448af1e8bef342b Mon Sep 17 00:00:00 2001 From: reynisson Date: Sun, 14 Nov 2021 14:28:45 +0100 Subject: [PATCH] Implemented the new executor added event over mqtt --- executor-pool/pom.xml | 6 +++ .../ch/unisg/common/ConfigProperties.java | 23 ++++++++++ .../ch/unisg/executorpool/TestController.java | 12 ------ .../common/clients/TapasMqttClient.java | 41 ++++++++++++++++++ ...ewExecutorToExecutorPoolWebController.java | 8 +++- .../PublishExecutorAddedEventAdapter.java | 43 +++++++++++++++++++ .../port/out/ExecutorAddedEventPort.java | 8 ++++ .../AddNewExecutorToExecutorPoolService.java | 17 ++++++-- .../domain/ExecutorAddedEvent.java | 10 +++++ .../src/main/resources/application.properties | 2 + 10 files changed, 153 insertions(+), 17 deletions(-) create mode 100644 executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java delete mode 100644 executor-pool/src/main/java/ch/unisg/executorpool/TestController.java create mode 100644 executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java create mode 100644 executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java create mode 100644 executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java create mode 100644 executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java diff --git a/executor-pool/pom.xml b/executor-pool/pom.xml index 2e75dcc..512235d 100644 --- a/executor-pool/pom.xml +++ b/executor-pool/pom.xml @@ -63,6 +63,12 @@ javax.transaction-api compile + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + compile + diff --git a/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java new file mode 100644 index 0000000..b46bf63 --- /dev/null +++ b/executor-pool/src/main/java/ch/unisg/common/ConfigProperties.java @@ -0,0 +1,23 @@ +package ch.unisg.common; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +import java.net.URI; + +@Component +public class ConfigProperties { + @Autowired + private Environment environment; + + /** + * Retrieves the URI of the WebSub hub. In this project, we use a single WebSub hub, but we could + * use multiple. + * + * @return the URI of the WebSub hub + */ + public URI getMqttBrokerUri() { + return URI.create(environment.getProperty("mqtt.broker.uri")); + } +} diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java b/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java deleted file mode 100644 index ca29e09..0000000 --- a/executor-pool/src/main/java/ch/unisg/executorpool/TestController.java +++ /dev/null @@ -1,12 +0,0 @@ -package ch.unisg.executorpool; - -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -public class TestController { - @RequestMapping("/") - public String index() { - return "Hello World! Executor Pool"; - } -} diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java new file mode 100644 index 0000000..0b24b81 --- /dev/null +++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/common/clients/TapasMqttClient.java @@ -0,0 +1,41 @@ +package ch.unisg.executorpool.adapter.common.clients; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +public class TapasMqttClient { + private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class); + + private static TapasMqttClient tapasClient = null; + + private MqttClient mqttClient; + private final String mqttClientId; + private final String brokerAddress; + + private TapasMqttClient(String brokerAddress) { + this.mqttClientId = UUID.randomUUID().toString(); + this.brokerAddress = brokerAddress; + } + + public static synchronized TapasMqttClient getInstance(String brokerAddress) { + + if (tapasClient == null) { + tapasClient = new TapasMqttClient(brokerAddress); + } + + return tapasClient; + } + + public void publishMessage(String topic, String payload) throws MqttException { + mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence()); + mqttClient.connect(); + MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); + mqttClient.publish(topic, message); + mqttClient.disconnect(); + } +} diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java index 5a2dc09..ff464d3 100644 --- a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java +++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/in/web/AddNewExecutorToExecutorPoolWebController.java @@ -1,5 +1,6 @@ package ch.unisg.executorpool.adapter.in.web; +import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient; import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand; @@ -13,6 +14,10 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import javax.validation.ConstraintViolationException; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import org.eclipse.paho.client.mqttv3.*; @RestController public class AddNewExecutorToExecutorPoolWebController { @@ -24,7 +29,7 @@ public class AddNewExecutorToExecutorPoolWebController { @PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE}) public ResponseEntity addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){ - try{ + try { AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand( new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())), new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType()) @@ -36,6 +41,7 @@ public class AddNewExecutorToExecutorPoolWebController { responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE); return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED); + } catch (ConstraintViolationException e){ throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage()); } diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java new file mode 100644 index 0000000..323bcbb --- /dev/null +++ b/executor-pool/src/main/java/ch/unisg/executorpool/adapter/out/messaging/PublishExecutorAddedEventAdapter.java @@ -0,0 +1,43 @@ +package ch.unisg.executorpool.adapter.out.messaging; + +import ch.unisg.common.ConfigProperties; +import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient; +import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation; +import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort; +import ch.unisg.executorpool.domain.ExecutorAddedEvent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Primary; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; + +import java.net.URI; + +@Component +@Primary +public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort { + + private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class); + + // TODO Can't autowire. Find fix + /* + @Autowired + private ConfigProperties config; + */ + + @Autowired + private Environment environment; + + @Override + public void publishExecutorAddedEvent(ExecutorAddedEvent event){ + try{ + var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri")); + mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass())); + } + catch (MqttException e){ + LOGGER.error(e.getMessage(), e); + } + } +} diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java new file mode 100644 index 0000000..ad75c75 --- /dev/null +++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/port/out/ExecutorAddedEventPort.java @@ -0,0 +1,8 @@ +package ch.unisg.executorpool.application.port.out; + +import ch.unisg.executorpool.domain.ExecutorAddedEvent; +import org.eclipse.paho.client.mqttv3.MqttException; + +public interface ExecutorAddedEventPort { + void publishExecutorAddedEvent(ExecutorAddedEvent event); +} diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java index 200739b..393024a 100644 --- a/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java +++ b/executor-pool/src/main/java/ch/unisg/executorpool/application/service/AddNewExecutorToExecutorPoolService.java @@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand; +import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort; +import ch.unisg.executorpool.domain.ExecutorAddedEvent; import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorPool; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.yaml.snakeyaml.constructor.DuplicateKeyException; import javax.transaction.Transactional; -import javax.validation.ConstraintViolationException; -@RequiredArgsConstructor @Component @Transactional public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase { + private final ExecutorAddedEventPort executorAddedEventPort; + + public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){ + this.executorAddedEventPort = executorAddedEventPort; + } + @Override public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){ ExecutorPool executorPool = ExecutorPool.getExecutorPool(); + var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType()); - return executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType()); + var executorAddedEvent = new ExecutorAddedEvent(newExecutor); + executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent); + + return newExecutor; } } diff --git a/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java new file mode 100644 index 0000000..6ec291e --- /dev/null +++ b/executor-pool/src/main/java/ch/unisg/executorpool/domain/ExecutorAddedEvent.java @@ -0,0 +1,10 @@ +package ch.unisg.executorpool.domain; + +import lombok.Getter; + +public class ExecutorAddedEvent { + @Getter + private ExecutorClass executorClass; + + public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; } +} diff --git a/executor-pool/src/main/resources/application.properties b/executor-pool/src/main/resources/application.properties index 8f91ca7..0c9ba7e 100644 --- a/executor-pool/src/main/resources/application.properties +++ b/executor-pool/src/main/resources/application.properties @@ -1 +1,3 @@ server.port=8083 + +mqtt.broker.uri=tcp://localhost:1883