Auctionhouse websub #63

Merged
Maece97 merged 5 commits from auctionhouse-websub into dev 2021-11-14 21:16:15 +00:00
48 changed files with 532 additions and 256 deletions
Showing only changes of commit afb534b0e1 - Show all commits

View File

@ -63,6 +63,12 @@
<artifactId>javax.transaction-api</artifactId> <artifactId>javax.transaction-api</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,22 @@
package ch.unisg.common;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.net.URI;
@Component
public class ConfigProperties {
@Autowired
private Environment environment;
/**
* Retrieves the URI of the MQTT broker.
*
* @return the URI of the MQTT broker
*/
public URI getMqttBrokerUri() {
return URI.create(environment.getProperty("mqtt.broker.uri"));
}
}

View File

@ -1,12 +0,0 @@
package ch.unisg.executorpool;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@RequestMapping("/")
public String index() {
return "Hello World! Executor Pool";
}
}

View File

@ -0,0 +1,41 @@
package ch.unisg.executorpool.adapter.common.clients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class TapasMqttClient {
private static final Logger LOGGER = LogManager.getLogger(TapasMqttClient.class);
private static TapasMqttClient tapasClient = null;
private MqttClient mqttClient;
private final String mqttClientId;
private final String brokerAddress;
private TapasMqttClient(String brokerAddress) {
this.mqttClientId = UUID.randomUUID().toString();
this.brokerAddress = brokerAddress;
}
public static synchronized TapasMqttClient getInstance(String brokerAddress) {
if (tapasClient == null) {
tapasClient = new TapasMqttClient(brokerAddress);
}
return tapasClient;
}
public void publishMessage(String topic, String payload) throws MqttException {
mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
mqttClient.connect();
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttClient.publish(topic, message);
mqttClient.disconnect();
}
}

View File

@ -0,0 +1,51 @@
package ch.unisg.executorpool.adapter.common.formats;
import ch.unisg.executorpool.domain.ExecutorClass;
import lombok.Getter;
import lombok.Setter;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.List;
public class ExecutorJsonRepresentation {
public static final String EXECUTOR_MEDIA_TYPE = "application/json";
@Getter @Setter
private String executorUri;
@Getter @Setter
private String executorTaskType;
// TODO Check if this need Setters. Also applies to AuctionJsonRepresentation
public ExecutorJsonRepresentation(String executorUri, String executorTaskType){
this.executorUri = executorUri;
this.executorTaskType = executorTaskType;
}
public static String serialize(ExecutorClass executorClass) {
JSONObject payload = new JSONObject();
payload.put("executorUri", executorClass.getExecutorUri().getValue());
payload.put("executorTaskType", executorClass.getExecutorTaskType().getValue());
return payload.toString();
}
public static String serialize(List<ExecutorClass> listOfExecutors) {
JSONArray jsonArray = new JSONArray();
for (ExecutorClass executor: listOfExecutors) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("executorUri", executor.getExecutorUri().getValue());
jsonObject.put("executorTaskType", executor.getExecutorTaskType().getValue());
jsonArray.put(jsonObject);
}
return jsonArray.toString();
}
private ExecutorJsonRepresentation() { }
}

View File

@ -1,5 +1,7 @@
package ch.unisg.executorpool.adapter.in.web; package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
@ -11,6 +13,11 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException; import org.springframework.web.server.ResponseStatusException;
import javax.validation.ConstraintViolationException; import javax.validation.ConstraintViolationException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.*;
@RestController @RestController
public class AddNewExecutorToExecutorPoolWebController { public class AddNewExecutorToExecutorPoolWebController {
@ -20,19 +27,21 @@ public class AddNewExecutorToExecutorPoolWebController {
this.addNewExecutorToExecutorPoolUseCase = addNewExecutorToExecutorPoolUseCase; this.addNewExecutorToExecutorPoolUseCase = addNewExecutorToExecutorPoolUseCase;
} }
@PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorMediaType.EXECUTOR_MEDIA_TYPE}) @PostMapping(path = "/executor-pool/AddExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
public ResponseEntity<String> addNewExecutorToExecutorPool(@RequestBody ExecutorClass executorClass){ public ResponseEntity<String> addNewExecutorToExecutorPool(@RequestBody ExecutorJsonRepresentation payload){
try { try {
AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand( AddNewExecutorToExecutorPoolCommand command = new AddNewExecutorToExecutorPoolCommand(
executorClass.getExecutorIp(), executorClass.getExecutorPort(), executorClass.getExecutorTaskType() new ExecutorClass.ExecutorUri(URI.create(payload.getExecutorUri())),
new ExecutorClass.ExecutorTaskType(payload.getExecutorTaskType())
); );
ExecutorClass newExecutor = addNewExecutorToExecutorPoolUseCase.addNewExecutorToExecutorPool(command); ExecutorClass newExecutor = addNewExecutorToExecutorPoolUseCase.addNewExecutorToExecutorPool(command);
HttpHeaders responseHeaders = new HttpHeaders(); HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE); responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
return new ResponseEntity<>(ExecutorMediaType.serialize(newExecutor), responseHeaders, HttpStatus.CREATED);
} catch (ConstraintViolationException e){ } catch (ConstraintViolationException e){
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage()); throw new ResponseStatusException(HttpStatus.BAD_REQUEST, e.getMessage());
} }

View File

@ -1,38 +0,0 @@
package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.domain.ExecutorClass;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.List;
final public class ExecutorMediaType {
public static final String EXECUTOR_MEDIA_TYPE = "application/json";
public static String serialize(ExecutorClass executorClass) {
JSONObject payload = new JSONObject();
payload.put("executorIp", executorClass.getExecutorIp().getValue());
payload.put("executorPort", executorClass.getExecutorPort().getValue());
payload.put("executorTaskType", executorClass.getExecutorTaskType().getValue());
return payload.toString();
}
public static String serialize(List<ExecutorClass> listOfExecutors) {
String serializedList = "[ \n";
for (ExecutorClass executor: listOfExecutors) {
serializedList += serialize(executor) + "\n";
}
// return serializedList + "\n ]";
JSONArray jsonArray = new JSONArray();
JSONObject jsonObject = new JSONObject();
jsonObject.put("executorIp", "localhost");
jsonArray.put(jsonObject);
return jsonArray.toString();
}
private ExecutorMediaType() { }
}

View File

@ -1,35 +0,0 @@
package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeQuery;
import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeUseCase;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class GetAllExecutorInExecutorPoolByTypeWebController {
private final GetAllExecutorInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase;
public GetAllExecutorInExecutorPoolByTypeWebController(GetAllExecutorInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase){
this.getAllExecutorInExecutorPoolByTypeUseCase = getAllExecutorInExecutorPoolByTypeUseCase;
}
@GetMapping(path = "/executor-pool/GetAllExecutorInExecutorPoolByType/{taskType}")
public ResponseEntity<String> getAllExecutorInExecutorPoolByType(@PathVariable("taskType") String taskType){
GetAllExecutorInExecutorPoolByTypeQuery query = new GetAllExecutorInExecutorPoolByTypeQuery(new ExecutorTaskType(taskType));
List<ExecutorClass> matchedExecutors = getAllExecutorInExecutorPoolByTypeUseCase.getAllExecutorInExecutorPoolByType(query);
// Add the content type as a response header
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorMediaType.serialize(matchedExecutors), responseHeaders, HttpStatus.OK);
}
}

View File

@ -0,0 +1,36 @@
package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeQuery;
import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeUseCase;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class GetAllExecutorsInExecutorPoolByTypeWebController {
private final GetAllExecutorsInExecutorPoolByTypeUseCase getAllExecutorsInExecutorPoolByTypeUseCase;
public GetAllExecutorsInExecutorPoolByTypeWebController(GetAllExecutorsInExecutorPoolByTypeUseCase getAllExecutorInExecutorPoolByTypeUseCase){
this.getAllExecutorsInExecutorPoolByTypeUseCase = getAllExecutorInExecutorPoolByTypeUseCase;
}
@GetMapping(path = "/executor-pool/GetAllExecutorsInExecutorPoolByType/{taskType}")
public ResponseEntity<String> getAllExecutorInExecutorPoolByType(@PathVariable("taskType") String taskType){
GetAllExecutorsInExecutorPoolByTypeQuery query = new GetAllExecutorsInExecutorPoolByTypeQuery(new ExecutorTaskType(taskType));
List<ExecutorClass> matchedExecutors = getAllExecutorsInExecutorPoolByTypeUseCase.getAllExecutorsInExecutorPoolByType(query);
// Add the content type as a response header
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(matchedExecutors), responseHeaders, HttpStatus.OK);
}
}

View File

@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web; package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolUseCase;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
@ -24,8 +25,8 @@ public class GetAllExecutorsInExecutorPoolWebController {
// Add the content type as a response header // Add the content type as a response header
HttpHeaders responseHeaders = new HttpHeaders(); HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE); responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorMediaType.serialize(executorClassList), responseHeaders, HttpStatus.OK); return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(executorClassList), responseHeaders, HttpStatus.OK);
} }
} }

View File

@ -1,5 +1,6 @@
package ch.unisg.executorpool.adapter.in.web; package ch.unisg.executorpool.adapter.in.web;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand; import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
@ -11,6 +12,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException; import org.springframework.web.server.ResponseStatusException;
import java.net.URI;
import java.util.Optional; import java.util.Optional;
@RestController @RestController
@ -21,9 +23,11 @@ public class RemoveExecutorFromExecutorPoolWebController {
this.removeExecutorFromExecutorPoolUseCase = removeExecutorFromExecutorPoolUseCase; this.removeExecutorFromExecutorPoolUseCase = removeExecutorFromExecutorPoolUseCase;
} }
@PostMapping(path = "/executor-pool/RemoveExecutor", consumes = {ExecutorMediaType.EXECUTOR_MEDIA_TYPE}) @PostMapping(path = "/executor-pool/RemoveExecutor", consumes = {ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE})
public ResponseEntity<String> removeExecutorFromExecutorPool(@RequestBody ExecutorClass executorClass){ public ResponseEntity<String> removeExecutorFromExecutorPool(@RequestBody ExecutorJsonRepresentation executorJsonRepresentation){
RemoveExecutorFromExecutorPoolCommand command = new RemoveExecutorFromExecutorPoolCommand(executorClass.getExecutorIp(), executorClass.getExecutorPort()); RemoveExecutorFromExecutorPoolCommand command = new RemoveExecutorFromExecutorPoolCommand(
new ExecutorClass.ExecutorUri(URI.create(executorJsonRepresentation.getExecutorUri()))
);
Optional<ExecutorClass> removedExecutor = removeExecutorFromExecutorPoolUseCase.removeExecutorFromExecutorPool(command); Optional<ExecutorClass> removedExecutor = removeExecutorFromExecutorPoolUseCase.removeExecutorFromExecutorPool(command);
if(removedExecutor.isEmpty()){ if(removedExecutor.isEmpty()){
@ -31,9 +35,9 @@ public class RemoveExecutorFromExecutorPoolWebController {
} }
HttpHeaders responseHeaders = new HttpHeaders(); HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorMediaType.EXECUTOR_MEDIA_TYPE); responseHeaders.add(HttpHeaders.CONTENT_TYPE, ExecutorJsonRepresentation.EXECUTOR_MEDIA_TYPE);
return new ResponseEntity<>(ExecutorMediaType.serialize(removedExecutor.get()), responseHeaders, return new ResponseEntity<>(ExecutorJsonRepresentation.serialize(removedExecutor.get()), responseHeaders,
HttpStatus.OK); HttpStatus.OK);
} }
} }

View File

@ -0,0 +1,43 @@
package ch.unisg.executorpool.adapter.out.messaging;
import ch.unisg.common.ConfigProperties;
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.net.URI;
@Component
@Primary
public class PublishExecutorAddedEventAdapter implements ExecutorAddedEventPort {
private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
// TODO Can't autowire. Find fix
/*
@Autowired
private ConfigProperties config;
*/
@Autowired
private Environment environment;
@Override
public void publishExecutorAddedEvent(ExecutorAddedEvent event){
try{
var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
mqttClient.publishMessage("ch/unisg/tapas/executors/added", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
}
catch (MqttException e){
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,41 @@
package ch.unisg.executorpool.adapter.out.messaging;
import ch.unisg.executorpool.adapter.common.clients.TapasMqttClient;
import ch.unisg.executorpool.adapter.common.formats.ExecutorJsonRepresentation;
import ch.unisg.executorpool.application.port.out.ExecutorRemovedEventPort;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
@Primary
public class PublishExecutorRemovedEventAdapter implements ExecutorRemovedEventPort {
private static final Logger LOGGER = LogManager.getLogger(PublishExecutorAddedEventAdapter.class);
// TODO Can't autowire. Find fix
/*
@Autowired
private ConfigProperties config;
*/
@Autowired
private Environment environment;
@Override
public void publishExecutorRemovedEvent(ExecutorRemovedEvent event){
try{
var mqttClient = TapasMqttClient.getInstance(environment.getProperty("mqtt.broker.uri"));
mqttClient.publishMessage("ch/unisg/tapas/executors/removed", ExecutorJsonRepresentation.serialize(event.getExecutorClass()));
}
catch (MqttException e){
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -2,8 +2,7 @@ package ch.unisg.executorpool.application.port.in;
import ch.unisg.common.SelfValidating; import ch.unisg.common.SelfValidating;
import ch.unisg.executorpool.domain.ExecutorPool; import ch.unisg.executorpool.domain.ExecutorPool;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorIp; import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorPort;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType; import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
import lombok.Value; import lombok.Value;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -11,17 +10,13 @@ import javax.validation.constraints.NotNull;
@Value @Value
public class AddNewExecutorToExecutorPoolCommand extends SelfValidating<AddNewExecutorToExecutorPoolCommand> { public class AddNewExecutorToExecutorPoolCommand extends SelfValidating<AddNewExecutorToExecutorPoolCommand> {
@NotNull @NotNull
private final ExecutorIp executorIp; private final ExecutorUri executorUri;
@NotNull
private final ExecutorPort executorPort;
@NotNull @NotNull
private final ExecutorTaskType executorTaskType; private final ExecutorTaskType executorTaskType;
public AddNewExecutorToExecutorPoolCommand(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){ public AddNewExecutorToExecutorPoolCommand(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
this.executorIp = executorIp; this.executorUri = executorUri;
this.executorPort = executorPort;
this.executorTaskType = executorTaskType; this.executorTaskType = executorTaskType;
this.validateSelf(); this.validateSelf();
} }

View File

@ -1,9 +0,0 @@
package ch.unisg.executorpool.application.port.in;
import ch.unisg.executorpool.domain.ExecutorClass;
import java.util.List;
public interface GetAllExecutorInExecutorPoolByTypeUseCase {
List<ExecutorClass> getAllExecutorInExecutorPoolByType(GetAllExecutorInExecutorPoolByTypeQuery query);
}

View File

@ -7,11 +7,11 @@ import lombok.Value;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@Value @Value
public class GetAllExecutorInExecutorPoolByTypeQuery extends SelfValidating<GetAllExecutorInExecutorPoolByTypeQuery> { public class GetAllExecutorsInExecutorPoolByTypeQuery extends SelfValidating<GetAllExecutorsInExecutorPoolByTypeQuery> {
@NotNull @NotNull
private final ExecutorTaskType executorTaskType; private final ExecutorTaskType executorTaskType;
public GetAllExecutorInExecutorPoolByTypeQuery(ExecutorTaskType executorTaskType){ public GetAllExecutorsInExecutorPoolByTypeQuery(ExecutorTaskType executorTaskType){
this.executorTaskType = executorTaskType; this.executorTaskType = executorTaskType;
this.validateSelf(); this.validateSelf();
} }

View File

@ -0,0 +1,9 @@
package ch.unisg.executorpool.application.port.in;
import ch.unisg.executorpool.domain.ExecutorClass;
import java.util.List;
public interface GetAllExecutorsInExecutorPoolByTypeUseCase {
List<ExecutorClass> getAllExecutorsInExecutorPoolByType(GetAllExecutorsInExecutorPoolByTypeQuery query);
}

View File

@ -1,9 +1,7 @@
package ch.unisg.executorpool.application.port.in; package ch.unisg.executorpool.application.port.in;
import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.common.SelfValidating; import ch.unisg.common.SelfValidating;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorIp; import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorPort;
import lombok.Value; import lombok.Value;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -11,14 +9,10 @@ import javax.validation.constraints.NotNull;
@Value @Value
public class RemoveExecutorFromExecutorPoolCommand extends SelfValidating<RemoveExecutorFromExecutorPoolCommand> { public class RemoveExecutorFromExecutorPoolCommand extends SelfValidating<RemoveExecutorFromExecutorPoolCommand> {
@NotNull @NotNull
private final ExecutorIp executorIp; private final ExecutorUri executorUri;
@NotNull public RemoveExecutorFromExecutorPoolCommand(ExecutorUri executorUri){
private final ExecutorPort executorPort; this.executorUri = executorUri;
public RemoveExecutorFromExecutorPoolCommand(ExecutorIp executorIp, ExecutorPort executorPort){
this.executorIp = executorIp;
this.executorPort = executorPort;
this.validateSelf(); this.validateSelf();
} }
} }

View File

@ -0,0 +1,8 @@
package ch.unisg.executorpool.application.port.out;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import org.eclipse.paho.client.mqttv3.MqttException;
public interface ExecutorAddedEventPort {
void publishExecutorAddedEvent(ExecutorAddedEvent event);
}

View File

@ -0,0 +1,7 @@
package ch.unisg.executorpool.application.port.out;
import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
public interface ExecutorRemovedEventPort {
void publishExecutorRemovedEvent(ExecutorRemovedEvent event);
}

View File

@ -2,24 +2,33 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand; import ch.unisg.executorpool.application.port.in.AddNewExecutorToExecutorPoolCommand;
import ch.unisg.executorpool.application.port.out.ExecutorAddedEventPort;
import ch.unisg.executorpool.domain.ExecutorAddedEvent;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool; import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.constructor.DuplicateKeyException;
import javax.transaction.Transactional; import javax.transaction.Transactional;
import javax.validation.ConstraintViolationException;
@RequiredArgsConstructor
@Component @Component
@Transactional @Transactional
public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase { public class AddNewExecutorToExecutorPoolService implements AddNewExecutorToExecutorPoolUseCase {
private final ExecutorAddedEventPort executorAddedEventPort;
public AddNewExecutorToExecutorPoolService(ExecutorAddedEventPort executorAddedEventPort){
this.executorAddedEventPort = executorAddedEventPort;
}
@Override @Override
public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){ public ExecutorClass addNewExecutorToExecutorPool(AddNewExecutorToExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool(); ExecutorPool executorPool = ExecutorPool.getExecutorPool();
var newExecutor = executorPool.addNewExecutor(command.getExecutorUri(), command.getExecutorTaskType());
return executorPool.addNewExecutor(command.getExecutorIp(), command.getExecutorPort(), command.getExecutorTaskType()); var executorAddedEvent = new ExecutorAddedEvent(newExecutor);
executorAddedEventPort.publishExecutorAddedEvent(executorAddedEvent);
return newExecutor;
} }
} }

View File

@ -1,7 +1,7 @@
package ch.unisg.executorpool.application.service; package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeQuery; import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeQuery;
import ch.unisg.executorpool.application.port.in.GetAllExecutorInExecutorPoolByTypeUseCase; import ch.unisg.executorpool.application.port.in.GetAllExecutorsInExecutorPoolByTypeUseCase;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool; import ch.unisg.executorpool.domain.ExecutorPool;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -13,10 +13,10 @@ import java.util.List;
@RequiredArgsConstructor @RequiredArgsConstructor
@Component @Component
@Transactional @Transactional
public class GetAllExecutorInExecutorPoolByTypeService implements GetAllExecutorInExecutorPoolByTypeUseCase { public class GetAllExecutorsInExecutorPoolByTypeService implements GetAllExecutorsInExecutorPoolByTypeUseCase {
@Override @Override
public List<ExecutorClass> getAllExecutorInExecutorPoolByType(GetAllExecutorInExecutorPoolByTypeQuery query){ public List<ExecutorClass> getAllExecutorsInExecutorPoolByType(GetAllExecutorsInExecutorPoolByTypeQuery query){
ExecutorPool executorPool = ExecutorPool.getExecutorPool(); ExecutorPool executorPool = ExecutorPool.getExecutorPool();
return executorPool.getAllExecutorsByType(query.getExecutorTaskType()); return executorPool.getAllExecutorsByType(query.getExecutorTaskType());
} }

View File

@ -2,21 +2,36 @@ package ch.unisg.executorpool.application.service;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand; import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolCommand;
import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase; import ch.unisg.executorpool.application.port.in.RemoveExecutorFromExecutorPoolUseCase;
import ch.unisg.executorpool.application.port.out.ExecutorRemovedEventPort;
import ch.unisg.executorpool.domain.ExecutorClass; import ch.unisg.executorpool.domain.ExecutorClass;
import ch.unisg.executorpool.domain.ExecutorPool; import ch.unisg.executorpool.domain.ExecutorPool;
import ch.unisg.executorpool.domain.ExecutorRemovedEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.transaction.Transactional; import javax.transaction.Transactional;
import java.util.Optional; import java.util.Optional;
@RequiredArgsConstructor
@Component @Component
@Transactional @Transactional
public class RemoveExecutorFromExecutorPoolService implements RemoveExecutorFromExecutorPoolUseCase { public class RemoveExecutorFromExecutorPoolService implements RemoveExecutorFromExecutorPoolUseCase {
private final ExecutorRemovedEventPort executorRemovedEventPort;
public RemoveExecutorFromExecutorPoolService(ExecutorRemovedEventPort executorRemovedEventPort){
this.executorRemovedEventPort = executorRemovedEventPort;
}
@Override @Override
public Optional<ExecutorClass> removeExecutorFromExecutorPool(RemoveExecutorFromExecutorPoolCommand command){ public Optional<ExecutorClass> removeExecutorFromExecutorPool(RemoveExecutorFromExecutorPoolCommand command){
ExecutorPool executorPool = ExecutorPool.getExecutorPool(); ExecutorPool executorPool = ExecutorPool.getExecutorPool();
return executorPool.removeExecutorByIpAndPort(command.getExecutorIp(), command.getExecutorPort()); var removedExecutor = executorPool.removeExecutorByIpAndPort(command.getExecutorUri());
if(removedExecutor.isPresent()){
var executorRemovedEvent = new ExecutorRemovedEvent(removedExecutor.get());
executorRemovedEventPort.publishExecutorRemovedEvent(executorRemovedEvent);
}
return removedExecutor;
} }
} }

View File

@ -0,0 +1,10 @@
package ch.unisg.executorpool.domain;
import lombok.Getter;
public class ExecutorAddedEvent {
@Getter
private ExecutorClass executorClass;
public ExecutorAddedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
}

View File

@ -3,36 +3,29 @@ package ch.unisg.executorpool.domain;
import lombok.Getter; import lombok.Getter;
import lombok.Value; import lombok.Value;
import java.net.URI;
public class ExecutorClass { public class ExecutorClass {
@Getter @Getter
private final ExecutorIp executorIp; private final ExecutorUri executorUri;
@Getter
private final ExecutorPort executorPort;
@Getter @Getter
private final ExecutorTaskType executorTaskType; private final ExecutorTaskType executorTaskType;
public ExecutorClass(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){ public ExecutorClass(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
this.executorIp = executorIp; this.executorUri = executorUri;
this.executorPort = executorPort;
this.executorTaskType = executorTaskType; this.executorTaskType = executorTaskType;
} }
protected static ExecutorClass createExecutorClass(ExecutorIp executorIp, ExecutorPort executorPort, ExecutorTaskType executorTaskType){ protected static ExecutorClass createExecutorClass(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
System.out.println("New Task: " + executorIp.getValue() + " " + executorPort.getValue() + " " + executorTaskType.getValue()); System.out.println("New Executor: " + executorUri.value.toString() + " " + executorTaskType.getValue());
return new ExecutorClass(executorIp, executorPort, executorTaskType); return new ExecutorClass(executorUri, executorTaskType);
} }
@Value @Value
public static class ExecutorIp { public static class ExecutorUri {
private String value; private URI value;
}
@Value
public static class ExecutorPort {
private String value;
} }
@Value @Value

View File

@ -1,5 +1,8 @@
package ch.unisg.executorpool.domain; package ch.unisg.executorpool.domain;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorUri;
import ch.unisg.executorpool.domain.ExecutorClass.ExecutorTaskType;
import lombok.Getter; import lombok.Getter;
import lombok.Value; import lombok.Value;
@ -20,19 +23,17 @@ public class ExecutorPool {
public static ExecutorPool getExecutorPool() { return executorPool; } public static ExecutorPool getExecutorPool() { return executorPool; }
public ExecutorClass addNewExecutor(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort, ExecutorClass.ExecutorTaskType executorTaskType){ public ExecutorClass addNewExecutor(ExecutorUri executorUri, ExecutorTaskType executorTaskType){
ExecutorClass newExecutor = ExecutorClass.createExecutorClass(executorIp, executorPort, executorTaskType); ExecutorClass newExecutor = ExecutorClass.createExecutorClass(executorUri, executorTaskType);
listOfExecutors.value.add(newExecutor); listOfExecutors.value.add(newExecutor);
System.out.println("Number of executors: " + listOfExecutors.value.size()); System.out.println("Number of executors: " + listOfExecutors.value.size());
return newExecutor; return newExecutor;
} }
public Optional<ExecutorClass> getExecutorByIpAndPort(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort){ public Optional<ExecutorClass> getExecutorByUri(ExecutorUri executorUri){
for (ExecutorClass executor : listOfExecutors.value ) { for (ExecutorClass executor : listOfExecutors.value ) {
// TODO can this be simplified by overwriting equals()? if(executor.getExecutorUri().getValue().equals(executorUri)){
if(executor.getExecutorIp().getValue().equalsIgnoreCase(executorIp.getValue()) &&
executor.getExecutorPort().getValue().equalsIgnoreCase(executorPort.getValue())){
return Optional.of(executor); return Optional.of(executor);
} }
} }
@ -54,11 +55,10 @@ public class ExecutorPool {
return matchedExecutors; return matchedExecutors;
} }
public Optional<ExecutorClass> removeExecutorByIpAndPort(ExecutorClass.ExecutorIp executorIp, ExecutorClass.ExecutorPort executorPort){ public Optional<ExecutorClass> removeExecutorByIpAndPort(ExecutorUri executorUri){
for (ExecutorClass executor : listOfExecutors.value ) { for (ExecutorClass executor : listOfExecutors.value ) {
// TODO can this be simplified by overwriting equals()? // TODO can this be simplified by overwriting equals()?
if(executor.getExecutorIp().getValue().equalsIgnoreCase(executorIp.getValue()) && if(executor.getExecutorUri().getValue().equals(executorUri.getValue())){
executor.getExecutorPort().getValue().equalsIgnoreCase(executorPort.getValue())){
listOfExecutors.value.remove(executor); listOfExecutors.value.remove(executor);
return Optional.of(executor); return Optional.of(executor);
} }

View File

@ -0,0 +1,11 @@
package ch.unisg.executorpool.domain;
import ch.unisg.executorpool.domain.ExecutorClass;
import lombok.Getter;
public class ExecutorRemovedEvent {
@Getter
private ExecutorClass executorClass;
public ExecutorRemovedEvent(ExecutorClass executorClass) { this.executorClass = executorClass; }
}

View File

@ -1 +1,3 @@
server.port=8083 server.port=8083
mqtt.broker.uri=tcp://localhost:1883

View File

@ -4,10 +4,12 @@ import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher; import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
import ch.unisg.tapas.auctionhouse.adapter.common.clients.WebSubSubscriber; import ch.unisg.tapas.auctionhouse.adapter.common.clients.WebSubSubscriber;
import ch.unisg.tapas.common.AuctionHouseResourceDirectory; import ch.unisg.tapas.common.AuctionHouseResourceDirectory;
import ch.unisg.tapas.common.ConfigProperties;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -21,15 +23,18 @@ import java.util.List;
public class TapasAuctionHouseApplication { public class TapasAuctionHouseApplication {
private static final Logger LOGGER = LogManager.getLogger(TapasAuctionHouseApplication.class); private static final Logger LOGGER = LogManager.getLogger(TapasAuctionHouseApplication.class);
@Autowired
private ConfigProperties config;
public static String RESOURCE_DIRECTORY = "https://api.interactions.ics.unisg.ch/auction-houses/"; public static String RESOURCE_DIRECTORY = "https://api.interactions.ics.unisg.ch/auction-houses/";
public static String MQTT_BROKER = "tcp://broker.hivemq.com:1883"; public static String MQTT_BROKER = "tcp://localhost:1883";
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication tapasAuctioneerApp = new SpringApplication(TapasAuctionHouseApplication.class); SpringApplication tapasAuctioneerApp = new SpringApplication(TapasAuctionHouseApplication.class);
// We will use these bootstrap methods in Week 6: // We will use these bootstrap methods in Week 6:
bootstrapMarketplaceWithWebSub(); bootstrapMarketplaceWithWebSub();
// bootstrapMarketplaceWithMqtt(); bootstrapMarketplaceWithMqtt();
tapasAuctioneerApp.run(args); tapasAuctioneerApp.run(args);
} }

View File

@ -68,7 +68,10 @@ public class TapasMqttClient {
mqttClient.subscribe(topic); mqttClient.subscribe(topic);
} }
private void publishMessage(String topic, String payload) throws MqttException { public void publishMessage(String topic, String payload) throws MqttException {
mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
mqttClient.connect();
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttClient.publish(topic, message); mqttClient.publish(topic, message);
} }

View File

@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.sql.Timestamp;
/** /**
* Used to expose a representation of the state of an auction through an interface. This class is * Used to expose a representation of the state of an auction through an interface. This class is
* only meant as a starting point when defining a uniform HTTP API for the Auction House: feel free * only meant as a starting point when defining a uniform HTTP API for the Auction House: feel free
@ -28,12 +30,12 @@ public class AuctionJsonRepresentation {
private String taskType; private String taskType;
@Getter @Setter @Getter @Setter
private Integer deadline; private Timestamp deadline;
public AuctionJsonRepresentation() { } public AuctionJsonRepresentation() { }
public AuctionJsonRepresentation(String auctionId, String auctionHouseUri, String taskUri, public AuctionJsonRepresentation(String auctionId, String auctionHouseUri, String taskUri,
String taskType, Integer deadline) { String taskType, Timestamp deadline) {
this.auctionId = auctionId; this.auctionId = auctionId;
this.auctionHouseUri = auctionHouseUri; this.auctionHouseUri = auctionHouseUri;
this.taskUri = taskUri; this.taskUri = taskUri;

View File

@ -1,34 +0,0 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
import ch.unisg.tapas.auctionhouse.application.handler.ExecutorAddedHandler;
import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorAddedEvent;
import ch.unisg.tapas.auctionhouse.domain.Auction;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Template for receiving an executor added event via HTTP
*/
@RestController
public class ExecutorAddedEventListenerHttpAdapter {
@PostMapping(path = "/executors/{taskType}/{executorId}")
public ResponseEntity<String> handleExecutorAddedEvent(@PathVariable("taskType") String taskType,
@PathVariable("executorId") String executorId) {
ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId),
new Auction.AuctionedTaskType(taskType)
);
ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();
newExecutorHandler.handleNewExecutorEvent(executorAddedEvent);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
}

View File

@ -1,16 +0,0 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* Template for handling an executor removed event received via an HTTP request
*/
@RestController
public class ExecutorRemovedEventListenerHttpAdapter {
// TODO: add annotations for request method, request URI, etc.
public void handleExecutorRemovedEvent(@PathVariable("executorId") String executorId) {
// TODO: implement logic
}
}

View File

@ -26,7 +26,7 @@ public class AuctionEventsMqttDispatcher {
// TODO: Register here your topics and event listener adapters // TODO: Register here your topics and event listener adapters
private void initRouter() { private void initRouter() {
router.put("ch/unisg/tapas-group-tutors/executors", new ExecutorAddedEventListenerMqttAdapter()); router.put("ch/unisg/tapas/executors/added", new ExecutorAddedEventListenerMqttAdapter());
} }
/** /**

View File

@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.net.URI;
/** /**
* Listener that handles events when an executor was added to this TAPAS application. * Listener that handles events when an executor was added to this TAPAS application.
* *
@ -24,16 +26,16 @@ public class ExecutorAddedEventListenerMqttAdapter extends AuctionEventMqttListe
String payload = new String(message.getPayload()); String payload = new String(message.getPayload());
try { try {
// Note: this messge representation is provided only as an example. You should use a // Note: this message representation is provided only as an example. You should use a
// representation that makes sense in the context of your application. // representation that makes sense in the context of your application.
JsonNode data = new ObjectMapper().readTree(payload); JsonNode data = new ObjectMapper().readTree(payload);
String taskType = data.get("taskType").asText(); String executorUri = data.get("executorUri").asText();
String executorId = data.get("executorId").asText(); String executorTaskType = data.get("executorTaskType").asText();
ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent( ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId), new ExecutorRegistry.ExecutorUri(URI.create(executorUri)),
new Auction.AuctionedTaskType(taskType) new Auction.AuctionedTaskType(executorTaskType)
); );
ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler(); ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();

View File

@ -0,0 +1,46 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt;
import ch.unisg.tapas.auctionhouse.application.handler.ExecutorRemovedHandler;
import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorRemovedEvent;
import ch.unisg.tapas.auctionhouse.domain.Auction;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Listener that handles events when an executor was removed to this TAPAS application.
*
* This class is only provided as an example to help you bootstrap the project.
*/
public class ExecutorRemovedEventListenerMqttAdapter extends AuctionEventMqttListener {
private static final Logger LOGGER = LogManager.getLogger(ExecutorRemovedEventListenerMqttAdapter.class);
@Override
public boolean handleEvent(MqttMessage message) {
String payload = new String(message.getPayload());
try {
// Note: this messge representation is provided only as an example. You should use a
// representation that makes sense in the context of your application.
JsonNode data = new ObjectMapper().readTree(payload);
String executorId = data.get("executorId").asText();
ExecutorRemovedEvent executorRemovedEvent = new ExecutorRemovedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId)
);
ExecutorRemovedHandler newExecutorHandler = new ExecutorRemovedHandler();
newExecutorHandler.handleNewExecutorEvent(executorRemovedEvent);
} catch (JsonProcessingException | NullPointerException e) {
LOGGER.error(e.getMessage(), e);
return false;
}
return true;
}
}

View File

@ -0,0 +1,36 @@
package ch.unisg.tapas.auctionhouse.adapter.out.messaging.websub;
import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
import ch.unisg.tapas.auctionhouse.adapter.common.formats.AuctionJsonRepresentation;
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
import ch.unisg.tapas.auctionhouse.domain.AuctionStartedEvent;
import ch.unisg.tapas.common.ConfigProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
public class PublishAuctionStartedEventMqttAdapter implements AuctionStartedEventPort {
private static final Logger LOGGER = LogManager.getLogger(PublishAuctionStartedEventMqttAdapter.class);
@Autowired
private ConfigProperties config;
@Override
public void publishAuctionStartedEvent(AuctionStartedEvent event) {
try{
var mqttClient = TapasMqttClient.getInstance(config.getMqttBrokerUri().toString(), new AuctionEventsMqttDispatcher());
mqttClient.publishMessage("ch/unisg/tapas/auctions", AuctionJsonRepresentation.serialize(event.getAuction()));
}
catch (MqttException | JsonProcessingException e){
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -29,7 +29,6 @@ import java.util.stream.Collectors;
* This class is a template for publishing auction started events via WebSub. * This class is a template for publishing auction started events via WebSub.
*/ */
@Component @Component
@Primary
public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEventPort { public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEventPort {
// You can use this object to retrieve properties from application.properties, e.g. the // You can use this object to retrieve properties from application.properties, e.g. the
// WebSub hub publish endpoint, etc. // WebSub hub publish endpoint, etc.

View File

@ -11,6 +11,6 @@ public class ExecutorAddedHandler implements ExecutorAddedEventHandler {
@Override @Override
public boolean handleNewExecutorEvent(ExecutorAddedEvent executorAddedEvent) { public boolean handleNewExecutorEvent(ExecutorAddedEvent executorAddedEvent) {
return ExecutorRegistry.getInstance().addExecutor(executorAddedEvent.getTaskType(), return ExecutorRegistry.getInstance().addExecutor(executorAddedEvent.getTaskType(),
executorAddedEvent.getExecutorId()); executorAddedEvent.getExecutorUri());
} }
} }

View File

@ -14,6 +14,9 @@ public class ExecutorRemovedHandler implements ExecutorRemovedEventHandler {
@Override @Override
public boolean handleExecutorRemovedEvent(ExecutorRemovedEvent executorRemovedEvent) { public boolean handleExecutorRemovedEvent(ExecutorRemovedEvent executorRemovedEvent) {
return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorId()); return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorUri());
}
public void handleNewExecutorEvent(ExecutorRemovedEvent executorRemovedEvent) {
} }
} }

View File

@ -1,7 +1,8 @@
package ch.unisg.tapas.auctionhouse.application.port.in; package ch.unisg.tapas.auctionhouse.application.port.in;
import ch.unisg.tapas.auctionhouse.domain.Auction.AuctionedTaskType; import ch.unisg.tapas.auctionhouse.domain.Auction.AuctionedTaskType;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier; import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating; import ch.unisg.tapas.common.SelfValidating;
import lombok.Value; import lombok.Value;
@ -13,7 +14,7 @@ import javax.validation.constraints.NotNull;
@Value @Value
public class ExecutorAddedEvent extends SelfValidating<ExecutorAddedEvent> { public class ExecutorAddedEvent extends SelfValidating<ExecutorAddedEvent> {
@NotNull @NotNull
private final ExecutorIdentifier executorId; private final ExecutorRegistry.ExecutorUri executorUri;
@NotNull @NotNull
private final AuctionedTaskType taskType; private final AuctionedTaskType taskType;
@ -21,10 +22,10 @@ public class ExecutorAddedEvent extends SelfValidating<ExecutorAddedEvent> {
/** /**
* Constructs an executor added event. * Constructs an executor added event.
* *
* @param executorId the identifier of the executor that was added to this TAPAS application * @param executorUri the identifier of the executor that was added to this TAPAS application
*/ */
public ExecutorAddedEvent(ExecutorIdentifier executorId, AuctionedTaskType taskType) { public ExecutorAddedEvent(ExecutorUri executorUri, AuctionedTaskType taskType) {
this.executorId = executorId; this.executorUri = executorUri;
this.taskType = taskType; this.taskType = taskType;
this.validateSelf(); this.validateSelf();

View File

@ -1,6 +1,7 @@
package ch.unisg.tapas.auctionhouse.application.port.in; package ch.unisg.tapas.auctionhouse.application.port.in;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier; import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating; import ch.unisg.tapas.common.SelfValidating;
import lombok.Value; import lombok.Value;
@ -12,15 +13,15 @@ import javax.validation.constraints.NotNull;
@Value @Value
public class ExecutorRemovedEvent extends SelfValidating<ExecutorRemovedEvent> { public class ExecutorRemovedEvent extends SelfValidating<ExecutorRemovedEvent> {
@NotNull @NotNull
private final ExecutorIdentifier executorId; private final ExecutorUri executorUri;
/** /**
* Constructs an executor removed event. * Constructs an executor removed event.
* *
* @param executorId the identifier of the executor that was removed from this TAPAS application * @param executorUri
*/ */
public ExecutorRemovedEvent(ExecutorIdentifier executorId) { public ExecutorRemovedEvent(ExecutorUri executorUri) {
this.executorId = executorId; this.executorUri = executorUri;
this.validateSelf(); this.validateSelf();
} }
} }

View File

@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.application.service;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionCommand; import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionCommand;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase; import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionWonEventPort; import ch.unisg.tapas.auctionhouse.application.port.out.AuctionWonEventPort;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort; import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
import ch.unisg.tapas.auctionhouse.domain.*; import ch.unisg.tapas.auctionhouse.domain.*;
@ -11,6 +12,7 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -25,7 +27,7 @@ import java.util.concurrent.TimeUnit;
public class StartAuctionService implements LaunchAuctionUseCase { public class StartAuctionService implements LaunchAuctionUseCase {
private static final Logger LOGGER = LogManager.getLogger(StartAuctionService.class); private static final Logger LOGGER = LogManager.getLogger(StartAuctionService.class);
private final static int DEFAULT_AUCTION_DEADLINE_MILLIS = 10000; private final Timestamp DEFAULT_AUCTION_DEADLINE_MILLIS = Timestamp.valueOf("1970-01-01 00:00:01");
// Event port used to publish an auction started event // Event port used to publish an auction started event
private final AuctionStartedEventPort auctionStartedEventPort; private final AuctionStartedEventPort auctionStartedEventPort;
@ -63,7 +65,7 @@ public class StartAuctionService implements LaunchAuctionUseCase {
auctions.addAuction(auction); auctions.addAuction(auction);
// Schedule the closing of the auction at the deadline // Schedule the closing of the auction at the deadline
service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue(), service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue().getTime() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
// Publish an auction started event // Publish an auction started event

View File

@ -4,6 +4,7 @@ import lombok.Getter;
import lombok.Value; import lombok.Value;
import java.net.URI; import java.net.URI;
import java.sql.Timestamp;
import java.util.*; import java.util.*;
/** /**
@ -166,6 +167,6 @@ public class Auction {
@Value @Value
public static class AuctionDeadline { public static class AuctionDeadline {
int value; Timestamp value;
} }
} }

View File

@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.domain;
import lombok.Value; import lombok.Value;
import java.net.URI;
import java.util.*; import java.util.*;
/** /**
@ -13,7 +14,7 @@ import java.util.*;
public class ExecutorRegistry { public class ExecutorRegistry {
private static ExecutorRegistry registry; private static ExecutorRegistry registry;
private final Map<Auction.AuctionedTaskType, Set<ExecutorIdentifier>> executors; private final Map<Auction.AuctionedTaskType, Set<ExecutorUri>> executors;
private ExecutorRegistry() { private ExecutorRegistry() {
this.executors = new Hashtable<>(); this.executors = new Hashtable<>();
@ -31,14 +32,14 @@ public class ExecutorRegistry {
* Adds an executor to the registry for a given task type. * Adds an executor to the registry for a given task type.
* *
* @param taskType the type of the task * @param taskType the type of the task
* @param executorIdentifier the identifier of the executor (can be any string) * @param executorUri the executor's URI
* @return true unless a runtime exception occurs * @return true unless a runtime exception occurs
*/ */
public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorIdentifier executorIdentifier) { public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorUri executorUri) {
Set<ExecutorIdentifier> taskTypeExecs = executors.getOrDefault(taskType, Set<ExecutorUri> taskTypeExecs = executors.getOrDefault(taskType,
Collections.synchronizedSet(new HashSet<>())); Collections.synchronizedSet(new HashSet<>()));
taskTypeExecs.add(executorIdentifier); taskTypeExecs.add(executorUri);
executors.put(taskType, taskTypeExecs); executors.put(taskType, taskTypeExecs);
return true; return true;
@ -47,17 +48,17 @@ public class ExecutorRegistry {
/** /**
* Removes an executor from the registry. The executor is disassociated from all known task types. * Removes an executor from the registry. The executor is disassociated from all known task types.
* *
* @param executorIdentifier the identifier of the executor (can be any string) * @param executorUri the executor's URI
* @return true unless a runtime exception occurs * @return true unless a runtime exception occurs
*/ */
public boolean removeExecutor(ExecutorIdentifier executorIdentifier) { public boolean removeExecutor(ExecutorUri executorUri) {
Iterator<Auction.AuctionedTaskType> iterator = executors.keySet().iterator(); Iterator<Auction.AuctionedTaskType> iterator = executors.keySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Auction.AuctionedTaskType taskType = iterator.next(); Auction.AuctionedTaskType taskType = iterator.next();
Set<ExecutorIdentifier> set = executors.get(taskType); Set<ExecutorUri> set = executors.get(taskType);
set.remove(executorIdentifier); set.remove(executorUri);
if (set.isEmpty()) { if (set.isEmpty()) {
iterator.remove(); iterator.remove();
@ -80,7 +81,7 @@ public class ExecutorRegistry {
// Value Object for the executor identifier // Value Object for the executor identifier
@Value @Value
public static class ExecutorIdentifier { public static class ExecutorUri {
String value; URI value;
} }
} }

View File

@ -61,4 +61,14 @@ public class ConfigProperties {
public URI getTaskListUri() { public URI getTaskListUri() {
return URI.create(environment.getProperty("tasks.list.uri")); return URI.create(environment.getProperty("tasks.list.uri"));
} }
/**
* Retrieves the URI of the MQTT broker.
*
* @return the URI of the MQTT broker
*/
public URI getMqttBrokerUri() {
return URI.create(environment.getProperty("mqtt.broker.uri"));
}
} }

View File

@ -10,3 +10,4 @@ tasks.list.uri=https://tapas-tasks.86-119-34-23.nip.io/
application.environment=development application.environment=development
auctionhouse.uri=http://localhost:8086 auctionhouse.uri=http://localhost:8086
websub.hub.uri=http://localhost:3000 websub.hub.uri=http://localhost:3000
mqtt.broker.uri=tcp://localhost:1883

View File

@ -5,6 +5,7 @@ import ch.unisg.tapastasks.tasks.application.port.in.DeleteTaskCommand;
import ch.unisg.tapastasks.tasks.application.port.in.DeleteTaskUseCase; import ch.unisg.tapastasks.tasks.application.port.in.DeleteTaskUseCase;
import ch.unisg.tapastasks.tasks.domain.Task; import ch.unisg.tapastasks.tasks.domain.Task;
import ch.unisg.tapastasks.tasks.domain.TaskList; import ch.unisg.tapastasks.tasks.domain.TaskList;
import jdk.jshell.spi.ExecutionControl;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -23,11 +24,10 @@ public class DeleteTaskService implements DeleteTaskUseCase {
Optional<Task> updatedTask = taskList.retrieveTaskById(command.getTaskId()); Optional<Task> updatedTask = taskList.retrieveTaskById(command.getTaskId());
Task newTask = updatedTask.get(); Task newTask = updatedTask.get();
// TODO: Fill in the right condition into the if-statement and the else-statement // TODO: Fill in the right condition into the if-statement and the else-statement
if (/*the task can be deleted*/){ if (true){
return taskList.deleteTaskById(command.getTaskId()); return taskList.deleteTaskById(command.getTaskId());
} else {
/*send message back to TaskList that the task cannot be deleted*/
} }
// TODO Handle with a return message
return Optional.empty();
} }
} }