Merge branch 'auctionhouse-websub' of https://github.com/SCS-ASSE-FS21-Group1/tapas into auctionhouse-websub

This commit is contained in:
2021-11-14 22:09:33 +01:00
48 changed files with 532 additions and 256 deletions

View File

@@ -4,10 +4,12 @@ import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
import ch.unisg.tapas.auctionhouse.adapter.common.clients.WebSubSubscriber;
import ch.unisg.tapas.common.AuctionHouseResourceDirectory;
import ch.unisg.tapas.common.ConfigProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -21,15 +23,18 @@ import java.util.List;
public class TapasAuctionHouseApplication {
private static final Logger LOGGER = LogManager.getLogger(TapasAuctionHouseApplication.class);
@Autowired
private ConfigProperties config;
public static String RESOURCE_DIRECTORY = "https://api.interactions.ics.unisg.ch/auction-houses/";
public static String MQTT_BROKER = "tcp://broker.hivemq.com:1883";
public static String MQTT_BROKER = "tcp://localhost:1883";
public static void main(String[] args) {
SpringApplication tapasAuctioneerApp = new SpringApplication(TapasAuctionHouseApplication.class);
// We will use these bootstrap methods in Week 6:
bootstrapMarketplaceWithWebSub();
// bootstrapMarketplaceWithMqtt();
bootstrapMarketplaceWithMqtt();
tapasAuctioneerApp.run(args);
}

View File

@@ -68,7 +68,10 @@ public class TapasMqttClient {
mqttClient.subscribe(topic);
}
private void publishMessage(String topic, String payload) throws MqttException {
public void publishMessage(String topic, String payload) throws MqttException {
mqttClient = new org.eclipse.paho.client.mqttv3.MqttClient(brokerAddress, mqttClientId, new MemoryPersistence());
mqttClient.connect();
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttClient.publish(topic, message);
}

View File

@@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.sql.Timestamp;
/**
* Used to expose a representation of the state of an auction through an interface. This class is
* only meant as a starting point when defining a uniform HTTP API for the Auction House: feel free
@@ -28,12 +30,12 @@ public class AuctionJsonRepresentation {
private String taskType;
@Getter @Setter
private Integer deadline;
private Timestamp deadline;
public AuctionJsonRepresentation() { }
public AuctionJsonRepresentation(String auctionId, String auctionHouseUri, String taskUri,
String taskType, Integer deadline) {
String taskType, Timestamp deadline) {
this.auctionId = auctionId;
this.auctionHouseUri = auctionHouseUri;
this.taskUri = taskUri;

View File

@@ -1,34 +0,0 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
import ch.unisg.tapas.auctionhouse.application.handler.ExecutorAddedHandler;
import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorAddedEvent;
import ch.unisg.tapas.auctionhouse.domain.Auction;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Template for receiving an executor added event via HTTP
*/
@RestController
public class ExecutorAddedEventListenerHttpAdapter {
@PostMapping(path = "/executors/{taskType}/{executorId}")
public ResponseEntity<String> handleExecutorAddedEvent(@PathVariable("taskType") String taskType,
@PathVariable("executorId") String executorId) {
ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId),
new Auction.AuctionedTaskType(taskType)
);
ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();
newExecutorHandler.handleNewExecutorEvent(executorAddedEvent);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
}

View File

@@ -1,16 +0,0 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.http;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* Template for handling an executor removed event received via an HTTP request
*/
@RestController
public class ExecutorRemovedEventListenerHttpAdapter {
// TODO: add annotations for request method, request URI, etc.
public void handleExecutorRemovedEvent(@PathVariable("executorId") String executorId) {
// TODO: implement logic
}
}

View File

@@ -26,7 +26,7 @@ public class AuctionEventsMqttDispatcher {
// TODO: Register here your topics and event listener adapters
private void initRouter() {
router.put("ch/unisg/tapas-group-tutors/executors", new ExecutorAddedEventListenerMqttAdapter());
router.put("ch/unisg/tapas/executors/added", new ExecutorAddedEventListenerMqttAdapter());
}
/**

View File

@@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.net.URI;
/**
* Listener that handles events when an executor was added to this TAPAS application.
*
@@ -24,16 +26,16 @@ public class ExecutorAddedEventListenerMqttAdapter extends AuctionEventMqttListe
String payload = new String(message.getPayload());
try {
// Note: this messge representation is provided only as an example. You should use a
// Note: this message representation is provided only as an example. You should use a
// representation that makes sense in the context of your application.
JsonNode data = new ObjectMapper().readTree(payload);
String taskType = data.get("taskType").asText();
String executorId = data.get("executorId").asText();
String executorUri = data.get("executorUri").asText();
String executorTaskType = data.get("executorTaskType").asText();
ExecutorAddedEvent executorAddedEvent = new ExecutorAddedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId),
new Auction.AuctionedTaskType(taskType)
new ExecutorRegistry.ExecutorUri(URI.create(executorUri)),
new Auction.AuctionedTaskType(executorTaskType)
);
ExecutorAddedHandler newExecutorHandler = new ExecutorAddedHandler();

View File

@@ -0,0 +1,46 @@
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt;
import ch.unisg.tapas.auctionhouse.application.handler.ExecutorRemovedHandler;
import ch.unisg.tapas.auctionhouse.application.port.in.ExecutorRemovedEvent;
import ch.unisg.tapas.auctionhouse.domain.Auction;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Listener that handles events when an executor was removed to this TAPAS application.
*
* This class is only provided as an example to help you bootstrap the project.
*/
public class ExecutorRemovedEventListenerMqttAdapter extends AuctionEventMqttListener {
private static final Logger LOGGER = LogManager.getLogger(ExecutorRemovedEventListenerMqttAdapter.class);
@Override
public boolean handleEvent(MqttMessage message) {
String payload = new String(message.getPayload());
try {
// Note: this messge representation is provided only as an example. You should use a
// representation that makes sense in the context of your application.
JsonNode data = new ObjectMapper().readTree(payload);
String executorId = data.get("executorId").asText();
ExecutorRemovedEvent executorRemovedEvent = new ExecutorRemovedEvent(
new ExecutorRegistry.ExecutorIdentifier(executorId)
);
ExecutorRemovedHandler newExecutorHandler = new ExecutorRemovedHandler();
newExecutorHandler.handleNewExecutorEvent(executorRemovedEvent);
} catch (JsonProcessingException | NullPointerException e) {
LOGGER.error(e.getMessage(), e);
return false;
}
return true;
}
}

View File

@@ -0,0 +1,36 @@
package ch.unisg.tapas.auctionhouse.adapter.out.messaging.websub;
import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
import ch.unisg.tapas.auctionhouse.adapter.common.formats.AuctionJsonRepresentation;
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
import ch.unisg.tapas.auctionhouse.domain.AuctionStartedEvent;
import ch.unisg.tapas.common.ConfigProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
public class PublishAuctionStartedEventMqttAdapter implements AuctionStartedEventPort {
private static final Logger LOGGER = LogManager.getLogger(PublishAuctionStartedEventMqttAdapter.class);
@Autowired
private ConfigProperties config;
@Override
public void publishAuctionStartedEvent(AuctionStartedEvent event) {
try{
var mqttClient = TapasMqttClient.getInstance(config.getMqttBrokerUri().toString(), new AuctionEventsMqttDispatcher());
mqttClient.publishMessage("ch/unisg/tapas/auctions", AuctionJsonRepresentation.serialize(event.getAuction()));
}
catch (MqttException | JsonProcessingException e){
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
* This class is a template for publishing auction started events via WebSub.
*/
@Component
@Primary
public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEventPort {
// You can use this object to retrieve properties from application.properties, e.g. the
// WebSub hub publish endpoint, etc.

View File

@@ -11,6 +11,6 @@ public class ExecutorAddedHandler implements ExecutorAddedEventHandler {
@Override
public boolean handleNewExecutorEvent(ExecutorAddedEvent executorAddedEvent) {
return ExecutorRegistry.getInstance().addExecutor(executorAddedEvent.getTaskType(),
executorAddedEvent.getExecutorId());
executorAddedEvent.getExecutorUri());
}
}

View File

@@ -14,6 +14,9 @@ public class ExecutorRemovedHandler implements ExecutorRemovedEventHandler {
@Override
public boolean handleExecutorRemovedEvent(ExecutorRemovedEvent executorRemovedEvent) {
return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorId());
return ExecutorRegistry.getInstance().removeExecutor(executorRemovedEvent.getExecutorUri());
}
public void handleNewExecutorEvent(ExecutorRemovedEvent executorRemovedEvent) {
}
}

View File

@@ -1,7 +1,8 @@
package ch.unisg.tapas.auctionhouse.application.port.in;
import ch.unisg.tapas.auctionhouse.domain.Auction.AuctionedTaskType;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating;
import lombok.Value;
@@ -13,7 +14,7 @@ import javax.validation.constraints.NotNull;
@Value
public class ExecutorAddedEvent extends SelfValidating<ExecutorAddedEvent> {
@NotNull
private final ExecutorIdentifier executorId;
private final ExecutorRegistry.ExecutorUri executorUri;
@NotNull
private final AuctionedTaskType taskType;
@@ -21,10 +22,10 @@ public class ExecutorAddedEvent extends SelfValidating<ExecutorAddedEvent> {
/**
* Constructs an executor added event.
*
* @param executorId the identifier of the executor that was added to this TAPAS application
* @param executorUri the identifier of the executor that was added to this TAPAS application
*/
public ExecutorAddedEvent(ExecutorIdentifier executorId, AuctionedTaskType taskType) {
this.executorId = executorId;
public ExecutorAddedEvent(ExecutorUri executorUri, AuctionedTaskType taskType) {
this.executorUri = executorUri;
this.taskType = taskType;
this.validateSelf();

View File

@@ -1,6 +1,7 @@
package ch.unisg.tapas.auctionhouse.application.port.in;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorIdentifier;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry;
import ch.unisg.tapas.auctionhouse.domain.ExecutorRegistry.ExecutorUri;
import ch.unisg.tapas.common.SelfValidating;
import lombok.Value;
@@ -12,15 +13,15 @@ import javax.validation.constraints.NotNull;
@Value
public class ExecutorRemovedEvent extends SelfValidating<ExecutorRemovedEvent> {
@NotNull
private final ExecutorIdentifier executorId;
private final ExecutorUri executorUri;
/**
* Constructs an executor removed event.
*
* @param executorId the identifier of the executor that was removed from this TAPAS application
* @param executorUri
*/
public ExecutorRemovedEvent(ExecutorIdentifier executorId) {
this.executorId = executorId;
public ExecutorRemovedEvent(ExecutorUri executorUri) {
this.executorUri = executorUri;
this.validateSelf();
}
}

View File

@@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.application.service;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionCommand;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
import ch.unisg.tapas.auctionhouse.application.port.in.LaunchAuctionUseCase;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionWonEventPort;
import ch.unisg.tapas.auctionhouse.application.port.out.AuctionStartedEventPort;
import ch.unisg.tapas.auctionhouse.domain.*;
@@ -11,6 +12,7 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -25,7 +27,7 @@ import java.util.concurrent.TimeUnit;
public class StartAuctionService implements LaunchAuctionUseCase {
private static final Logger LOGGER = LogManager.getLogger(StartAuctionService.class);
private final static int DEFAULT_AUCTION_DEADLINE_MILLIS = 10000;
private final Timestamp DEFAULT_AUCTION_DEADLINE_MILLIS = Timestamp.valueOf("1970-01-01 00:00:01");
// Event port used to publish an auction started event
private final AuctionStartedEventPort auctionStartedEventPort;
@@ -63,7 +65,7 @@ public class StartAuctionService implements LaunchAuctionUseCase {
auctions.addAuction(auction);
// Schedule the closing of the auction at the deadline
service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue(),
service.schedule(new CloseAuctionTask(auction.getAuctionId()), deadline.getValue().getTime() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
// Publish an auction started event

View File

@@ -4,6 +4,7 @@ import lombok.Getter;
import lombok.Value;
import java.net.URI;
import java.sql.Timestamp;
import java.util.*;
/**
@@ -166,6 +167,6 @@ public class Auction {
@Value
public static class AuctionDeadline {
int value;
Timestamp value;
}
}

View File

@@ -2,6 +2,7 @@ package ch.unisg.tapas.auctionhouse.domain;
import lombok.Value;
import java.net.URI;
import java.util.*;
/**
@@ -13,7 +14,7 @@ import java.util.*;
public class ExecutorRegistry {
private static ExecutorRegistry registry;
private final Map<Auction.AuctionedTaskType, Set<ExecutorIdentifier>> executors;
private final Map<Auction.AuctionedTaskType, Set<ExecutorUri>> executors;
private ExecutorRegistry() {
this.executors = new Hashtable<>();
@@ -31,14 +32,14 @@ public class ExecutorRegistry {
* Adds an executor to the registry for a given task type.
*
* @param taskType the type of the task
* @param executorIdentifier the identifier of the executor (can be any string)
* @param executorUri the executor's URI
* @return true unless a runtime exception occurs
*/
public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorIdentifier executorIdentifier) {
Set<ExecutorIdentifier> taskTypeExecs = executors.getOrDefault(taskType,
public boolean addExecutor(Auction.AuctionedTaskType taskType, ExecutorUri executorUri) {
Set<ExecutorUri> taskTypeExecs = executors.getOrDefault(taskType,
Collections.synchronizedSet(new HashSet<>()));
taskTypeExecs.add(executorIdentifier);
taskTypeExecs.add(executorUri);
executors.put(taskType, taskTypeExecs);
return true;
@@ -47,17 +48,17 @@ public class ExecutorRegistry {
/**
* Removes an executor from the registry. The executor is disassociated from all known task types.
*
* @param executorIdentifier the identifier of the executor (can be any string)
* @param executorUri the executor's URI
* @return true unless a runtime exception occurs
*/
public boolean removeExecutor(ExecutorIdentifier executorIdentifier) {
public boolean removeExecutor(ExecutorUri executorUri) {
Iterator<Auction.AuctionedTaskType> iterator = executors.keySet().iterator();
while (iterator.hasNext()) {
Auction.AuctionedTaskType taskType = iterator.next();
Set<ExecutorIdentifier> set = executors.get(taskType);
Set<ExecutorUri> set = executors.get(taskType);
set.remove(executorIdentifier);
set.remove(executorUri);
if (set.isEmpty()) {
iterator.remove();
@@ -80,7 +81,7 @@ public class ExecutorRegistry {
// Value Object for the executor identifier
@Value
public static class ExecutorIdentifier {
String value;
public static class ExecutorUri {
URI value;
}
}

View File

@@ -61,4 +61,14 @@ public class ConfigProperties {
public URI getTaskListUri() {
return URI.create(environment.getProperty("tasks.list.uri"));
}
/**
* Retrieves the URI of the MQTT broker.
*
* @return the URI of the MQTT broker
*/
public URI getMqttBrokerUri() {
return URI.create(environment.getProperty("mqtt.broker.uri"));
}
}

View File

@@ -10,3 +10,4 @@ tasks.list.uri=https://tapas-tasks.86-119-34-23.nip.io/
application.environment=development
auctionhouse.uri=http://localhost:8086
websub.hub.uri=http://localhost:3000
mqtt.broker.uri=tcp://localhost:1883