Dev #65
|
@ -63,6 +63,12 @@
|
||||||
<artifactId>javax.transaction-api</artifactId>
|
<artifactId>javax.transaction-api</artifactId>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
<version>1.2.5</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
package ch.unisg.common;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.core.env.Environment;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ConfigProperties {
|
||||||
|
@Autowired
|
||||||
|
private Environment environment;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves the URI of the WebSub hub. In this project, we use a single WebSub hub, but we could
|
||||||
|
* use multiple.
|
||||||
|
*
|
||||||
|
* @return the URI of the WebSub hub
|
||||||
|
*/
|
||||||
|
public URI getMqttBrokerUri() {
|
||||||
|
return URI.create(environment.getProperty("mqtt.broker.uri"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,12 +0,0 @@
|
||||||
package ch.unisg.executorpool;
|
|
||||||
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
@RestController
|
|
||||||
public class TestController {
|
|
||||||
@RequestMapping("/")
|
|
||||||
public String index() {
|
|
||||||
return "Hello World! Executor Pool";
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package ch.unisg.executorpool.adapter.common.clients;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class TapasMqttClient {
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class);
|
||||||
|
|
||||||
|
private static TapasMqttClient tapasClient = null;
|
||||||
|
|
||||||
|
private MqttClient mqttClient;
|
||||||
|
private final String mqttClientId;
|
||||||
|
private final String brokerAddress;
|
||||||
|
|
||||||
|
private TapasMqttClient(String brokerAddress) {
|
||||||
|
this.mqttClientId = UUID.randomUUID().toString();
|
||||||
|
this.brokerAddress = brokerAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static synchronized TapasMqttClient getInstance(String brokerAddress) {
|
||||||
|
|
||||||
|
if (tapasClient == null) {
|
||||||
|
tapasClient = new TapasMqttClient(brokerAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tapasClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publishMessage(String topic, String payload) throws MqttException {
|
||||||
|
mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
|
||||||
|
mqttClient.connect();
|
||||||
|
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
|
||||||
|
mqttClient.publish(topic, message);
|
||||||
|
mqttClient.disconnect();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
package ch.unisg.executorpool.adapter.in.web;
|
package ch.unisg.executorpool.adapter.in.web;
|
||||||
|
|
||||||
|
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
|
||||||
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
|
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
|
||||||
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
|
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
|
||||||
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
|
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
|
||||||
|
@ -13,6 +14,10 @@ import org.springframework.web.bind.annotation.RestController;
|
||||||
import org.springframework.web.server.ResponseStatusException;
|
import org.springframework.web.server.ResponseStatusException;
|
||||||
import javax.validation.ConstraintViolationException;
|
import javax.validation.ConstraintViolationException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.*;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
public class AddNewExecutorToExecutorPoolWebController {
|
public class AddNewExecutorToExecutorPoolWebController {
|
||||||
|
@ -24,7 +29,7 @@ public class AddNewExecutorToExecutorPoolWebController {
|
||||||
|
|
||||||
@PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
|
@PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
|
||||||
public ResponseEntity<String> addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){
|
public ResponseEntity<String> addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){
|
||||||
try{
|
try {
|
||||||
AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand(
|
AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand(
|
||||||
new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())),
|
new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())),
|
||||||
new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType())
|
new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType())
|
||||||
|
@ -36,6 +41,7 @@ public class AddNewExecutorToExecutorPoolWebController {
|
||||||
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
|
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
|
||||||
|
|
||||||
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
|
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
|
||||||
|
|
||||||
} catch (ConstraintViolationException e){
|
} catch (ConstraintViolationException e){
|
||||||
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
|
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
package ch.unisg.executorpool.adapter.out.messaging;
|
||||||
|
|
||||||
|
import ch.unisg.common.ConfigProperties;
|
||||||
|
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
|
||||||
|
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
|
||||||
|
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
|
||||||
|
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
import org.springframework.core.env.Environment;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Primary
|
||||||
|
public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
|
||||||
|
|
||||||
|
// TODO Can't autowire. Find fix
|
||||||
|
/*
|
||||||
|
@Autowired
|
||||||
|
private ConfigProperties config;
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private Environment environment;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void publishExecutorAddedEvent(ExecutorAddedEvent event){
|
||||||
|
try{
|
||||||
|
var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
|
||||||
|
mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
|
||||||
|
}
|
||||||
|
catch (MqttException e){
|
||||||
|
LOGGER.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,8 @@
|
||||||
|
package ch.unisg.executorpool.application.port.out;
|
||||||
|
|
||||||
|
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
|
||||||
|
public interface ExecutorAddedEventPort {
|
||||||
|
void publishExecutorAddedEvent(ExecutorAddedEvent event);
|
||||||
|
}
|
|
@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service;
|
||||||
|
|
||||||
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
|
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
|
||||||
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
|
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
|
||||||
|
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
|
||||||
|
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
|
||||||
import ch.unisg.executorpool.domain.ExecutorClass;
|
import ch.unisg.executorpool.domain.ExecutorClass;
|
||||||
import ch.unisg.executorpool.domain.ExecutorPool;
|
import ch.unisg.executorpool.domain.ExecutorPool;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.yaml.snakeyaml.constructor.DuplicateKeyException;
|
|
||||||
|
|
||||||
import javax.transaction.Transactional;
|
import javax.transaction.Transactional;
|
||||||
import javax.validation.ConstraintViolationException;
|
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
@Component
|
@Component
|
||||||
@Transactional
|
@Transactional
|
||||||
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
|
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
|
||||||
|
|
||||||
|
private final ExecutorAddedEventPort executorAddedEventPort;
|
||||||
|
|
||||||
|
public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){
|
||||||
|
this.executorAddedEventPort = executorAddedEventPort;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
|
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
|
||||||
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
|
ExecutorPool executorPool = ExecutorPool.getExecutorPool();
|
||||||
|
var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
|
||||||
|
|
||||||
return executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
|
var executorAddedEvent = new ExecutorAddedEvent(newExecutor);
|
||||||
|
executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent);
|
||||||
|
|
||||||
|
return newExecutor;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
package ch.unisg.executorpool.domain;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
public class ExecutorAddedEvent {
|
||||||
|
@Getter
|
||||||
|
private ExecutorClass executorClass;
|
||||||
|
|
||||||
|
public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
|
||||||
|
}
|
|
@ -1 +1,3 @@
|
||||||
server.port=8083
|
server.port=8083
|
||||||
|
|
||||||
|
mqtt.broker.uri=tcp://localhost:1883
|
||||||
|
|
Loading…
Reference in New Issue
Block a user