WebSub fixes
This commit is contained in:
parent
32bf461026
commit
7a66f37556
|
@ -3,20 +3,18 @@ package ch.unisg.tapas;
|
|||
import ch.unisg.tapas.auctionhouse.adapter.common.clients.TapasMqttClient;
|
||||
import ch.unisg.tapas.auctionhouse.adapter.in.messaging.mqtt.AuctionEventsMqttDispatcher;
|
||||
import ch.unisg.tapas.auctionhouse.domain.AuctionHouseDiscovery;
|
||||
import ch.unisg.tapas.auctionhouse.domain.AuctionHouseDiscoveryInformation;
|
||||
import ch.unisg.tapas.auctionhouse.adapter.common.clients.WebSubSubscriber;
|
||||
import ch.unisg.tapas.common.AuctionHouseResourceDirectory;
|
||||
import ch.unisg.tapas.common.ConfigProperties;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -28,23 +26,17 @@ import java.util.concurrent.TimeUnit;
|
|||
public class TapasAuctionHouseApplication {
|
||||
private static final Logger LOGGER = LogManager.getLogger(TapasAuctionHouseApplication.class);
|
||||
|
||||
public static String RESOURCE_DIRECTORY = "http://localhost:3500";
|
||||
public static String DEFAULT_MQTT_BROKER = "tcp://broker.hivemq.com:1883";
|
||||
|
||||
private static ConfigurableEnvironment ENVIRONMENT;
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication tapasAuctioneerApp = new SpringApplication(TapasAuctionHouseApplication.class);
|
||||
ENVIRONMENT = tapasAuctioneerApp.run(args).getEnvironment();
|
||||
|
||||
// TODO Set start up of message services with config
|
||||
// We will use these bootstrap methods in Week 6:
|
||||
bootstrapMarketplaceWithWebSub();
|
||||
bootstrapMarketplaceWithMqtt();
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
|
||||
executor.scheduleAtFixedRate(crawlerRunnable, 30, 30, TimeUnit.SECONDS);
|
||||
// bootstrapMarketplaceWithMqtt()
|
||||
}
|
||||
/**
|
||||
* Discovers auction houses and subscribes to WebSub notifications
|
||||
|
@ -52,11 +44,12 @@ public class TapasAuctionHouseApplication {
|
|||
private static void bootstrapMarketplaceWithWebSub() {
|
||||
discoverAuctionHouseEndpoints();
|
||||
|
||||
// WebSubSubscriber subscriber = new WebSubSubscriber();
|
||||
WebSubSubscriber subscriber = new WebSubSubscriber(ENVIRONMENT.getProperty("auctionhouse.uri"));
|
||||
|
||||
// for (String endpoint : auctionHouseEndpoints) {
|
||||
// subscriber.subscribeToAuctionHouseEndpoint(URI.create(endpoint));
|
||||
// }
|
||||
for (AuctionHouseDiscoveryInformation endpoint : AuctionHouseDiscovery.getInstance().getAuctionHouseDiscoveryList()) {
|
||||
subscriber.subscribeToAuctionHouseEndpoint(endpoint.getWebSubUri().getValue());
|
||||
}
|
||||
//subscriber.subscribeToAuctionHouseEndpoint("https://websub.rocks/blog/100/v7wVgkzRrZXTadY3pXjx");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,17 +76,20 @@ public class TapasAuctionHouseApplication {
|
|||
|
||||
private static void discoverAuctionHouseEndpoints() {
|
||||
AuctionHouseResourceDirectory rd = new AuctionHouseResourceDirectory(
|
||||
URI.create(RESOURCE_DIRECTORY)
|
||||
URI.create(ENVIRONMENT.getProperty("discovery.endpoint.uri"))
|
||||
);
|
||||
|
||||
AuctionHouseDiscovery.getInstance().addAuctionHouseDiscoveryInformation(rd.retrieveAuctionHouseEndpoints());
|
||||
|
||||
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
|
||||
executor.scheduleAtFixedRate(crawlerRunnable, 300, 300, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
private static Runnable crawlerRunnable = new Runnable() {
|
||||
public void run() {
|
||||
AuctionHouseResourceDirectory rd = new AuctionHouseResourceDirectory(
|
||||
URI.create(RESOURCE_DIRECTORY)
|
||||
URI.create(ENVIRONMENT.getProperty("discovery.endpoint.uri"))
|
||||
);
|
||||
|
||||
AuctionHouseDiscovery.getInstance().addAuctionHouseDiscoveryInformation(rd.retrieveAuctionHouseEndpoints());
|
||||
|
|
|
@ -2,9 +2,11 @@ package ch.unisg.tapas.auctionhouse.adapter.common.clients;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.util.HashMap;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
@ -18,22 +20,23 @@ import org.springframework.http.HttpStatus;
|
|||
*/
|
||||
public class WebSubSubscriber {
|
||||
|
||||
// TODO get this somehow from properties file. But on clue how to do this with static variables
|
||||
static String WEBSUB_HUB_ENDPOINT = "http://localhost:3000";
|
||||
static String AUCTION_HOUSE_ENDPOINT = "http://localhost:8086";
|
||||
|
||||
Logger logger = Logger.getLogger(WebSubSubscriber.class.getName());
|
||||
|
||||
public void subscribeToAuctionHouseEndpoint(URI endpoint) {
|
||||
// TODO decide with other groups about auction house endpoint uri to discover websub topics
|
||||
// and replace the hardcoded one with it
|
||||
String topic = discoverWebSubTopic("http://localhost:3100/websub");
|
||||
String AUCTION_HOUSE_ENDPOINT;
|
||||
|
||||
if (topic == null) {
|
||||
public WebSubSubscriber(String AUCTION_HOUSE_ENDPOINT) {
|
||||
this.AUCTION_HOUSE_ENDPOINT = AUCTION_HOUSE_ENDPOINT;
|
||||
}
|
||||
|
||||
public void subscribeToAuctionHouseEndpoint(URI endpoint) {
|
||||
|
||||
HashMap<String, String> links = discoverWebSubTopic(endpoint);
|
||||
|
||||
if (links.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
subscribeToWebSub(topic);
|
||||
subscribeToWebSub(links.get("hub"), links.get("self"));
|
||||
|
||||
// Shoudl be done :D
|
||||
// TODO Subscribe to the auction house endpoint via WebSub:
|
||||
|
@ -52,23 +55,30 @@ public class WebSubSubscriber {
|
|||
// - the implementation notes of the WebSub hub you are using to distribute events
|
||||
}
|
||||
|
||||
private String discoverWebSubTopic(String endpoint) {
|
||||
private HashMap<String, String> discoverWebSubTopic(URI endpoint) {
|
||||
HttpClient client = HttpClient.newHttpClient();
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(endpoint))
|
||||
.uri(endpoint)
|
||||
.header("Content-Type", "application/json")
|
||||
.GET()
|
||||
.build();
|
||||
|
||||
HashMap<String, String> links = new HashMap<>();
|
||||
|
||||
try {
|
||||
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
if (response.statusCode() == HttpStatus.OK.value()) {
|
||||
// TODO decide with other groups about response structure and replace the hardcoded
|
||||
// uri with response uri
|
||||
JSONObject jsonObject = new JSONObject(response.body());
|
||||
System.out.println(jsonObject);
|
||||
return jsonObject.getString("topic");
|
||||
for (String link : response.headers().allValues("link")) {
|
||||
if (link.contains("rel=\"hub\"")) {
|
||||
String hub = link.split(">")[0];
|
||||
links.put("hub", hub.substring(1));
|
||||
} else if(link.contains("rel=\"self\"")) {
|
||||
String self = link.split(">")[0];
|
||||
links.put("self", self.substring(1));
|
||||
}
|
||||
System.out.println(link);
|
||||
}
|
||||
// TODO check for HTML tags second if links are not present in headers
|
||||
} else {
|
||||
logger.log(Level.SEVERE, "Could not find a websub uri");
|
||||
}
|
||||
|
@ -78,24 +88,26 @@ public class WebSubSubscriber {
|
|||
} catch (IOException e) {
|
||||
logger.log(Level.SEVERE, e.getLocalizedMessage(), e);
|
||||
}
|
||||
return null;
|
||||
return links;
|
||||
}
|
||||
|
||||
private void subscribeToWebSub(String topic) {
|
||||
private void subscribeToWebSub(String hub, String topic) {
|
||||
HttpClient client = HttpClient.newHttpClient();
|
||||
|
||||
String body = new JSONObject()
|
||||
.put("hub.callback", AUCTION_HOUSE_ENDPOINT + "/auction-started")
|
||||
.put("hub.mode", "subscribe")
|
||||
.put("hub.topic", topic)
|
||||
.put("hub.ws", false)
|
||||
.toString();
|
||||
|
||||
URI hubURI;
|
||||
try {
|
||||
hubURI = new URI(hub);
|
||||
} catch (URISyntaxException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(WEBSUB_HUB_ENDPOINT))
|
||||
.header("Content-Type", "application/json")
|
||||
.POST(HttpRequest.BodyPublishers.ofString(body))
|
||||
.uri(hubURI)
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.POST(HttpRequest.BodyPublishers.ofString("hub.mode=subscribe&hub.callback=" + AUCTION_HOUSE_ENDPOINT +
|
||||
"/auction-started/74c72c7f-2739-4124-aa35-a3225171a97c" + "&hub.topic=" + topic))
|
||||
.build();
|
||||
|
||||
|
||||
|
|
|
@ -34,9 +34,12 @@ public class AuctionStartedEventListenerWebSubAdapter {
|
|||
* @return 200 OK
|
||||
* @throws URISyntaxException
|
||||
**/
|
||||
@PostMapping(path = "/auction-started")
|
||||
// TODO generate a new capability ID instead of using a hardcoded one.
|
||||
@PostMapping(path = "/auction-started/74c72c7f-2739-4124-aa35-a3225171a97c")
|
||||
public ResponseEntity<Void> handleExecutorAddedEvent(@RequestBody Collection<AuctionJsonRepresentation> payload) throws URISyntaxException {
|
||||
|
||||
System.out.println("new auctions :O");
|
||||
|
||||
for (AuctionJsonRepresentation auction : payload) {
|
||||
auctionStartedHandler.handleAuctionStartedEvent(
|
||||
new AuctionStartedEvent(
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
package ch.unisg.tapas.auctionhouse.adapter.in.messaging.websub;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* This class is a template for handling auction started events received via WebSub
|
||||
*/
|
||||
@RestController
|
||||
public class DiscoverWebSubAdapter {
|
||||
|
||||
@Value("${websub.hub.uri}")
|
||||
private String webSubHubUri;
|
||||
|
||||
@Value("${auctionhouse.uri}")
|
||||
private String auctionHouseUri;
|
||||
|
||||
/**
|
||||
* Controller which listens to auction-started callbacks
|
||||
* @return 200 OK
|
||||
**/
|
||||
@GetMapping(path = "/websub/auctions")
|
||||
public ResponseEntity<String> handleDiscoverWebSubAuction() {
|
||||
|
||||
HttpHeaders header = new HttpHeaders();
|
||||
header.add("link", "<" + auctionHouseUri + "/auctions/>; rel=\"self\"");
|
||||
header.add("link", "<" + webSubHubUri + ">; rel=\"hub\"");
|
||||
|
||||
return ResponseEntity.ok().headers(header).body("");
|
||||
}
|
||||
}
|
|
@ -13,21 +13,9 @@ import org.springframework.web.bind.annotation.*;
|
|||
@RestController
|
||||
public class ValidateIntentWebSubAdapter {
|
||||
|
||||
@Value("${application.environment}")
|
||||
private String environment;
|
||||
|
||||
@GetMapping(path = "/auction-started")
|
||||
// TODO generate a new capability ID instead of using a hardcoded one.
|
||||
@GetMapping(path = "/auction-started/74c72c7f-2739-4124-aa35-a3225171a97c")
|
||||
public ResponseEntity<String> validateIntent(@RequestParam("hub.challenge") String challenge) {
|
||||
// Different implementation depending on local development or production
|
||||
if (environment.equalsIgnoreCase("development")) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Content-Type", "application/json");
|
||||
String body = new JSONObject()
|
||||
.put("hub.challenge", challenge)
|
||||
.toString();
|
||||
return new ResponseEntity<>(body, headers, HttpStatus.OK);
|
||||
} else {
|
||||
return new ResponseEntity<>(challenge, HttpStatus.OK);
|
||||
}
|
||||
return new ResponseEntity<>(challenge, HttpStatus.OK);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ import org.springframework.context.annotation.Primary;
|
|||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Primary
|
||||
public class PublishAuctionStartedEventMqttAdapter implements AuctionStartedEventPort {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(PublishAuctionStartedEventMqttAdapter.class);
|
||||
|
|
|
@ -29,12 +29,8 @@ import java.util.stream.Collectors;
|
|||
* This class is a template for publishing auction started events via WebSub.
|
||||
*/
|
||||
@Component
|
||||
@Primary
|
||||
public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEventPort {
|
||||
// You can use this object to retrieve properties from application.properties, e.g. the
|
||||
// WebSub hub publish endpoint, etc.
|
||||
@Autowired
|
||||
private ConfigProperties config;
|
||||
|
||||
@Value("${auctionhouse.uri}")
|
||||
private String auctionHouseUri;
|
||||
|
||||
|
@ -47,15 +43,12 @@ public class PublishAuctionStartedEventWebSubAdapter implements AuctionStartedEv
|
|||
public void publishAuctionStartedEvent(AuctionStartedEvent event) {
|
||||
HttpClient client = HttpClient.newHttpClient();
|
||||
|
||||
String body = new JSONObject()
|
||||
.put("hub.url", auctionHouseUri + "/auctions")
|
||||
.put("hub.mode", "publish")
|
||||
.toString();
|
||||
String body = "hub.url=" + auctionHouseUri + "/auctions/&hub.mode=publish";
|
||||
|
||||
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(webSubHubUri))
|
||||
.header("Content-Type", "application/json")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.POST(HttpRequest.BodyPublishers.ofString(body))
|
||||
.build();
|
||||
|
||||
|
|
|
@ -20,6 +20,9 @@ public class AuctionHouseDiscovery {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO load from config
|
||||
static String AUCTION_HOUSE_URI = "http://localhost:8086";
|
||||
|
||||
@Getter
|
||||
private List<AuctionHouseDiscoveryInformation> auctionHouseDiscoveryList = new ArrayList<>() {
|
||||
};
|
||||
|
@ -28,8 +31,8 @@ public class AuctionHouseDiscovery {
|
|||
try {
|
||||
// Add our information to list
|
||||
auctionHouseDiscoveryList.add(new AuctionHouseDiscoveryInformation(
|
||||
new AuctionHouseDiscoveryInformation.AuctionHouseUri(new URI("http://localhost:8086")),
|
||||
new AuctionHouseDiscoveryInformation.WebSubUri(new URI("http://localhost:8086/websub")),
|
||||
new AuctionHouseDiscoveryInformation.AuctionHouseUri(new URI(AUCTION_HOUSE_URI)),
|
||||
new AuctionHouseDiscoveryInformation.WebSubUri(new URI(AUCTION_HOUSE_URI + "/websub/auctions")),
|
||||
new AuctionHouseDiscoveryInformation.TaskTypes(tasktypes),
|
||||
new AuctionHouseDiscoveryInformation.TimeStamp(new Timestamp(new Date().getTime())),
|
||||
new AuctionHouseDiscoveryInformation.GroupName("Group 1")
|
||||
|
|
|
@ -10,6 +10,8 @@ auction.house.uri=https://tapas-auction-house.86-119-35-40.nip.io
|
|||
tasks.list.uri=http://localhost:8081
|
||||
|
||||
application.environment=development
|
||||
auctionhouse.uri=http://localhost:8086
|
||||
websub.hub.uri=http://localhost:3000
|
||||
auctionhouse.uri=http://fe10-77-59-152-182.eu.ngrok.io
|
||||
websub.hub.uri=https://pubsubhubbub.appspot.com
|
||||
mqtt.broker.uri=tcp://localhost:1883
|
||||
|
||||
discovery.endpoint.uri=http://localhost:3500/discovery/
|
||||
|
|
Loading…
Reference in New Issue
Block a user