Implemented the new executor added event over mqtt

This commit is contained in:
reynisson 2021-11-14 14:28:45 +01:00
parent 59795d0234
commit 55c094fc56
10 changed files with 153 additions and 17 deletions

View File

@ -63,6 +63,12 @@
<artifactId>javax.transaction-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,23 @@
package ch.unisg.common;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.net.URI;
@Component
public class ConfigProperties {
@Autowired
private Environment environment;
/**
* Retrieves the URI of the WebSub hub. In this project, we use a single WebSub hub, but we could
* use multiple.
*
* @return the URI of the WebSub hub
*/
public URI getMqttBrokerUri() {
return URI.create(environment.getProperty("mqtt.broker.uri"));
}
}

View File

@ -1,12 +0,0 @@
package ch.unisg.executorpool;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@RequestMapping("/")
public String index() {
return "Hello World! Executor Pool";
}
}

View File

@ -0,0 +1,41 @@
package ch.unisg.executorpool.adapter.common.clients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class TapasMqttClient {
private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class);
private static TapasMqttClient tapasClient = null;
private MqttClient mqttClient;
private final String mqttClientId;
private final String brokerAddress;
private TapasMqttClient(String brokerAddress) {
this.mqttClientId = UUID.randomUUID().toString();
this.brokerAddress = brokerAddress;
}
public static synchronized TapasMqttClient getInstance(String brokerAddress) {
if (tapasClient == null) {
tapasClient = new TapasMqttClient(brokerAddress);
}
return tapasClient;
}
public void publishMessage(String topic, String payload) throws MqttException {
mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
mqttClient.connect();
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttClient.publish(topic, message);
mqttClient.disconnect();
}
}

View File

@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
@ -13,6 +14,10 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import javax.validation.ConstraintViolationException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.*;
@RestController
public class AddNewExecutorToExecutorPoolWebController {
@ -36,6 +41,7 @@ public class AddNewExecutorToExecutorPoolWebController {
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
} catch (ConstraintViolationException e){
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
}

View File

@ -0,0 +1,43 @@
package ch.unisg.executorpool.adapter.out.messaging;
import ch.unisg.common.ConfigProperties;
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.net.URI;
@Component
@Primary
public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort {
private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
// TODO Can't autowire. Find fix
/*
@Autowired
private ConfigProperties config;
*/
@Autowired
private Environment environment;
@Override
public void publishExecutorAddedEvent(ExecutorAddedEvent event){
try{
var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
}
catch (MqttException e){
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,8 @@
package ch.unisg.executorpool.application.port.out;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import org.eclipse.paho.client.mqttv3.MqttException;
public interface ExecutorAddedEventPort {
void publishExecutorAddedEvent(ExecutorAddedEvent event);
}

View File

@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.constructor.DuplicateKeyException;
import javax.transaction.Transactional;
import javax.validation.ConstraintViolationException;
@RequiredArgsConstructor
@Component
@Transactional
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
private final ExecutorAddedEventPort executorAddedEventPort;
public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){
this.executorAddedEventPort = executorAddedEventPort;
}
@Override
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
return executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
var executorAddedEvent = new ExecutorAddedEvent(newExecutor);
executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent);
return newExecutor;
}
}

View File

@ -0,0 +1,10 @@
package ch.unisg.executorpool.domain;
import lombok.Getter;
public class ExecutorAddedEvent {
@Getter
private ExecutorClass executorClass;
public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
}

View File

@ -1 +1,3 @@
server.port=8083
mqtt.broker.uri=tcp://localhost:1883