From 67559e478fc799114ba8c3cad1be8c56dfeed84c Mon Sep 17 00:00:00 2001 From: nattamon Date: Fri, 21 Nov 2025 01:57:34 -0500 Subject: [PATCH 01/18] ADD user actor --- app/actors/UserActor.java | 136 +++++++++++++++++++++++++++ app/actors/UserParentActor.java | 44 +++++++++ app/assets/javascripts/index.coffee | 138 ++++++++++++++++++++++++++++ app/controllers/HomeController.java | 100 +++++++++++--------- app/modules/PekkoModule.java | 21 +++++ app/views/index.scala.html | 12 ++- app/views/main.scala.html | 2 +- app/views/search.scala.html | 4 +- conf/application.conf | 1 + conf/routes | 8 +- project/plugins.sbt | 4 + 11 files changed, 419 insertions(+), 51 deletions(-) create mode 100644 app/actors/UserActor.java create mode 100644 app/actors/UserParentActor.java create mode 100644 app/assets/javascripts/index.coffee create mode 100644 app/modules/PekkoModule.java diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java new file mode 100644 index 0000000..523b9d2 --- /dev/null +++ b/app/actors/UserActor.java @@ -0,0 +1,136 @@ +package actors; + +import org.apache.pekko.Done; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.ActorContext; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.*; +import org.apache.pekko.japi.Pair; +import com.fasterxml.jackson.databind.JsonNode; +import play.libs.Json; +import play.libs.ws.WSClient; +import models.Search; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; + +public class UserActor { + + // --- Messages --- + public interface Message {} + + public static final class Connect implements Message { + final ActorRef> replyTo; + public Connect(ActorRef> replyTo) { + this.replyTo = replyTo; + } + } + + public static final class PerformSearch implements Message { + public final String query; + public final String sortBy; + public PerformSearch(String query, String sortBy) { + this.query = query; + this.sortBy = sortBy; + } + } + + // Internal message to stop the actor safely + private static final class InternalStop implements Message {} + + // --- State --- + private final String id; + private final WSClient ws; + private final String apiKey; + private final ActorContext context; + private final List searchHistory = new ArrayList<>(); + + private final Sink hubSink; + private final Flow websocketFlow; + + // --- Factory --- + public static Behavior create(String id, WSClient ws, String apiKey) { + return Behaviors.setup(context -> new UserActor(id, ws, apiKey, context).behavior()); + } + + // --- Constructor --- + private UserActor(String id, WSClient ws, String apiKey, ActorContext context) { + this.id = id; + this.ws = ws; + this.apiKey = apiKey; + this.context = context; + + Materializer mat = Materializer.matFromSystem(context.getSystem()); + + ActorRef self = context.getSelf(); + + Pair, Source> sinkSourcePair = + MergeHub.of(JsonNode.class, 16) + .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) + .run(mat); + + this.hubSink = sinkSourcePair.first(); + Source hubSource = sinkSourcePair.second(); + + // Handle incoming messages from the WebSocket + Sink> jsonSink = Sink.foreach((JsonNode json) -> { + if (json.has("query")) { + String query = json.get("query").asText(); + String sortBy = "popularity"; + if (json.has("sortBy")) { + sortBy = json.get("sortBy").asText(); + } + self.tell(new PerformSearch(query, sortBy)); + } + }); + + // ping websocket to keep it alive + Source heartbeat = Source.tick( + Duration.ZERO, + Duration.ofSeconds(30), + (JsonNode) Json.newObject().put("type", "ping") + ).mapMaterializedValue(c -> NotUsed.getInstance()); + + this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) + .watchTermination((n, stage) -> { + stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); + return NotUsed.getInstance(); + }); + } + + // --- Behavior --- + private Behavior behavior() { + return Behaviors.receive(Message.class) + .onMessage(Connect.class, msg -> { + context.getLog().info("Client connected: {}", id); + msg.replyTo.tell(websocketFlow); + return Behaviors.same(); + }) + .onMessage(PerformSearch.class, msg -> { + context.getLog().info("User {} searching for: {}", id, msg.query); + + Search search = new Search(msg.query, msg.sortBy, apiKey); + + search.fetchResults(ws).thenAccept(v -> { + searchHistory.add(0, search); + if (searchHistory.size() > 10) searchHistory.remove(searchHistory.size() - 1); + + // Send updated results back to the user + JsonNode response = Json.toJson(searchHistory); + Source.single(response).runWith(hubSink, Materializer.matFromSystem(context.getSystem())); + }); + return Behaviors.same(); + }) + // Handle the stop message + .onMessage(InternalStop.class, msg -> { + context.getLog().info("WebSocket closed for user {}. Stopping actor.", id); + return Behaviors.stopped(); + }) + .build(); + } +} \ No newline at end of file diff --git a/app/actors/UserParentActor.java b/app/actors/UserParentActor.java new file mode 100644 index 0000000..5252332 --- /dev/null +++ b/app/actors/UserParentActor.java @@ -0,0 +1,44 @@ +package actors; + +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.stream.javadsl.Flow; +import com.fasterxml.jackson.databind.JsonNode; +import play.libs.ws.WSClient; + +/** + * Parent actor that manages the creation of individual UserActors for each WebSocket connection. + */ +public final class UserParentActor { + private UserParentActor() {} + + // Message to request the creation of a UserActor + public static final class Create { + final String id; + final ActorRef> replyTo; + + public Create(String id, ActorRef> replyTo) { + this.id = id; + this.replyTo = replyTo; + } + } + + public static Behavior create(WSClient wsClient, String apiKey) { + return Behaviors.setup(context -> { + return Behaviors.receive(Create.class) + .onMessage(Create.class, create -> { + // Spawn a new UserActor for this specific connection ID + ActorRef child = context.spawn( + UserActor.create(create.id, wsClient, apiKey), + "userActor-" + create.id + ); + // Ask the child to establish the stream + child.tell(new UserActor.Connect(create.replyTo)); + return Behaviors.same(); + }) + .build(); + }); + } +} \ No newline at end of file diff --git a/app/assets/javascripts/index.coffee b/app/assets/javascripts/index.coffee new file mode 100644 index 0000000..a9e3d58 --- /dev/null +++ b/app/assets/javascripts/index.coffee @@ -0,0 +1,138 @@ +$ -> + console.log "--> Starting NotiLytics Client Script" + + ws = null + + # Form Submit Handler + $("#searchForm").submit (event) -> + console.log "--> Blocking reload." + event.preventDefault() + + query = $("#searchbox").val() + sortBy = $("input[name='sortBy']:checked").val() + + if ws and ws.readyState == WebSocket.OPEN + console.log "--> Sending via WebSocket:", query, " Sort:", sortBy + # Send the query as a JSON object + ws.send(JSON.stringify({query: query, sortBy: sortBy})) + # clear search box + $("#searchbox").val("") + else + console.error "--> WebSocket is not connected! Cannot search." + + # WebSocket Connection + wsUrl = $("#ws-data").data("ws-url") + console.log "--> Found WS URL:", wsUrl + + if wsUrl + ws = new WebSocket(wsUrl) + + ws.onopen = -> + console.log "--> WebSocket Connection OPEN" + + ws.onmessage = (event) -> + # The server sends the entire Search History as a JSON List + data = JSON.parse event.data + if data.type == "ping" + console.log "--> Received Keep-Alive Ping" + return + if Array.isArray(data) + console.log "--> Received History Update" + updateResults(data) + + ws.onerror = (error) -> + console.error "--> WebSocket Error:", error + else + console.error "--> Error: No WebSocket URL found in HTML #ws-data" + +# Result Rendering Logic +updateResults = (history) -> + container = $("#results-area") + container.empty() # Clear previous results + + # loop list of Search objects + for search in history + + # Build the Search Header (Sentiment, Stats, Readability) + + # Calculate Sentiment HTML + sentimentHtml = "" + if search.sentiment == ":-)" + sentimentHtml = ":-)" + else if search.sentiment == ":-(" + sentimentHtml = ":-(" + else + sentimentHtml = ":-|" + + # main search part + searchHtml = $(""" +
+

Search Results

+ +

+ Query: #{search.rawQuery} | + Sorted by: #{search.sortBy} | + Sentiment: #{sentimentHtml} +

+ +

+ Average Flesch-Kincaid Reading Grade: #{search.fleschGradeLevel.toFixed(2)} | + Flesch Reading Ease Score: #{search.fleshReadingScore.toFixed(2)} +

+

+ Average Flesch-Kincaid Reading Grade (OSS): #{search.fleschGradeLevelOSS.toFixed(2)} | + Flesch Reading Ease Score (OSS): #{search.fleshReadingScoreOSS.toFixed(2)} +

+ +

+ + Statistics for "#{search.rawQuery}" + +

+ +
+
+
+ """) + + # Build the Article List + articleContainer = searchHtml.find(".article-list") + + # Limit to top 10 results + articles = search.results.slice(0, 10) + + for article in articles + # Construct the Source Link URL + # Route: /source/:sourceName?identifier=:identifier + sourceUrl = "/source/#{encodeURIComponent(article.sourceName)}?identifier=#{encodeURIComponent(article.sourceIdentifier)}" + + articleCard = $(""" +
+
+ #{article.title} +
+

+ + Source: #{article.sourceName} | + Published: #{article.publishedAtEDT} + +

+

+ + Flesch-Kincaid Grade Level: #{article.fleschKincaidGradeLevel.toFixed(2)} | + Flesch Reading Ease Score: #{article.fleschReadingScore.toFixed(2)} + +

+

+ + OSS Flesch-Kincaid Grade Level: #{article.fleschKincaidGradeLevelOSS.toFixed(2)} | + OSS Flesch Reading Ease Score: #{article.fleschReadingScoreOSS.toFixed(2)} + +

+

Description: #{article.description}

+
+ """) + articleContainer.append(articleCard) + + # Append the full search block to the page + container.append(searchHtml) \ No newline at end of file diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index e9de0e6..14f3d53 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -31,6 +31,8 @@ public class HomeController extends Controller { /** Asynchronous cache used to store session history and source data with a TTL. */ private final AsyncCacheApi asyncCache; + private final ActorSystem actorSystem; + private final ActorRef userParentActor; /** * Creates a HomeController with the required web client, caching system, and API key. @@ -41,10 +43,15 @@ public class HomeController extends Controller { * @param config the application's configuration, which contains the API key */ @Inject - public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config) { + public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config, ActorSystem actorSystem) { this.ws = ws; this.asyncCache = asyncCache; this.apiKey = config.getString("api.key"); + + this.actorSystem = actorSystem; + this.userParentActor = actorSystem.systemActorOf( + UserParentActor.create(ws, apiKey), "UserParentActor", Props.empty() + ); } /** @@ -57,26 +64,35 @@ public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config) { * @param request the HTTP request containing session information * @return renders the index or results page with the session token */ - public CompletionStage index(Http.Request request) { + public Result index(Http.Request request) { + return ok(views.html.index.render(request)); + } - // Use session token or generate one - String token = request.session().get("user_token").orElseGet(() -> { - String newToken = UUID.randomUUID().toString(); - return newToken; - }); + public WebSocket ws() { + return WebSocket.Json.acceptOrResult(request -> { + // Use the request ID or session ID to identify the user + String id = request.session().get("user_token").orElse(Long.toString(request.asScala().id())); - return asyncCache.get(token).thenApply(optionalHistory -> { - // Get cache history, or use empty list if nothing is cached - List history = (ArrayList) optionalHistory.orElseGet(ArrayList::new); + Scheduler scheduler = actorSystem.scheduler(); - // Render the appropriate page - Result result = history.isEmpty() - ? ok(views.html.index.render()) - : ok(views.html.results.render(history)); + // Ask the UserParentActor to create a flow for this user + CompletionStage> future = AskPattern.ask( + userParentActor, + (ActorRef> replyTo) -> new UserParentActor.Create(id, replyTo), + Duration.ofSeconds(10), + scheduler + ); // Ensure session token is set return result.addingToSession(request, "user_token", token); }); + return future.thenApply(flow -> + Either.>Right(flow) + ).exceptionally(e -> { + e.printStackTrace(); + return Either.Left(internalServerError("Websocket Error")); + }); + }); } /** @@ -90,34 +106,34 @@ public CompletionStage index(Http.Request request) { * @param sortBy the sort parameter for the search * @return a CompletionStage rendering the results page with updated history and sentiment */ - public CompletionStage search(Http.Request request, String rawQuery, String sortBy) { - - // Use session token or generate one - String token = request.session().get("user_token").orElseGet(() -> { - String newToken = UUID.randomUUID().toString(); - return newToken; - }); - - // Get cached history (might be null) - return asyncCache.get(token).thenCompose(optionalHistory -> { - List history = (ArrayList) optionalHistory.orElseGet(ArrayList::new); - - // Create search object - Search searchForm = new Search(rawQuery, sortBy, apiKey); - - // Fetch results asynchronously - return searchForm.fetchResults(this.ws) - .thenApply(v -> { - // First: Prepend new search and update cache - history.addFirst(searchForm); - // if more than 10 search, remove the oldest one - if(history.size() > 10) { history.removeLast(); } - asyncCache.set(token, history, 3600); - return ok(views.html.results.render(history)) - .addingToSession(request, "user_token", token); - }); - }); - } +// public CompletionStage search(Http.Request request, String rawQuery, String sortBy) { +// +// // Use session token or generate one +// String token = request.session().get("user_token").orElseGet(() -> { +// String newToken = UUID.randomUUID().toString(); +// return newToken; +// }); +// +// // Get cached history (might be null) +// return asyncCache.get(token).thenCompose(optionalHistory -> { +// List history = (ArrayList) optionalHistory.orElseGet(ArrayList::new); +// +// // Create search object +// Search searchForm = new Search(rawQuery, sortBy, apiKey); +// +// // Fetch results asynchronously +// return searchForm.fetchResults(this.ws) +// .thenApply(v -> { +// // First: Prepend new search and update cache +// history.addFirst(searchForm); +// // if more than 10 search, remove the oldest one +// if(history.size() > 10) { history.removeLast(); } +// asyncCache.set(token, history, 3600); +// return ok(views.html.results.render(history)) +// .addingToSession(request, "user_token", token); +// }); +// }); +// } /** * This action is performed when the source page is called. It retrieves the cache of all sources' information and calls fetchSourcePage diff --git a/app/modules/PekkoModule.java b/app/modules/PekkoModule.java new file mode 100644 index 0000000..07d9a62 --- /dev/null +++ b/app/modules/PekkoModule.java @@ -0,0 +1,21 @@ +package modules; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; + +public class PekkoModule extends AbstractModule { + @Override + protected void configure() { + // No explicit bindings here — provided below + } + + @Provides + @Singleton + public ActorSystem provideActorSystem() { + return ActorSystem.create(Behaviors.empty(), "MyPekkoSystem"); + } + +} \ No newline at end of file diff --git a/app/views/index.scala.html b/app/views/index.scala.html index dfe487d..f19f31a 100644 --- a/app/views/index.scala.html +++ b/app/views/index.scala.html @@ -1,5 +1,11 @@ -@() +@(request: play.mvc.Http.Request) @main("NotiLytics") { - @search() -} +
+ + @search() + +
+ + +} \ No newline at end of file diff --git a/app/views/main.scala.html b/app/views/main.scala.html index 270cd7b..b5d4431 100644 --- a/app/views/main.scala.html +++ b/app/views/main.scala.html @@ -20,13 +20,13 @@ @title + @* And here's where we render the `Html` object containing * the page content. *@ @content - diff --git a/app/views/search.scala.html b/app/views/search.scala.html index 90051e0..3befe6b 100644 --- a/app/views/search.scala.html +++ b/app/views/search.scala.html @@ -9,11 +9,11 @@

Welcome to NotiLytics (made by DNSL)

-
+
- +
diff --git a/conf/application.conf b/conf/application.conf index 3241053..249a5b9 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -2,5 +2,6 @@ # https://www.playframework.com/documentation/latest/ConfigFile play.modules.enabled += "play.api.cache.ehcache.EhCacheModule" +play.modules.enabled += "modules.PekkoModule" api.key = "1002d5e24b1a4a0b92719ed6281694a9" \ No newline at end of file diff --git a/conf/routes b/conf/routes index b59f7ec..525db5e 100644 --- a/conf/routes +++ b/conf/routes @@ -3,9 +3,11 @@ # ~~~~ # An example controller showing a sample home page -GET / controllers.HomeController.index(request: play.mvc.Http.Request) -GET /notilytics controllers.HomeController.index(request: play.mvc.Http.Request) -GET /search controllers.HomeController.search(request: play.mvc.Http.Request, rawQuery: String, sortBy: String) +#GET / controllers.HomeController.index(request: play.mvc.Http.Request) +#GET /notilytics controllers.HomeController.index(request: play.mvc.Http.Request) +GET / controllers.HomeController.index(request :Request) +GET /ws controllers.HomeController.ws +#GET /search controllers.HomeController.search(request: play.mvc.Http.Request, rawQuery: String, sortBy: String) GET /source/*sourceName controllers.HomeController.source(identifier: String, sourceName: String) GET /search/stats controllers.HomeController.stats(request: play.mvc.Http.Request, rawQuery: String) diff --git a/project/plugins.sbt b/project/plugins.sbt index b434c39..466a949 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,7 @@ addSbtPlugin("org.playframework" % "sbt-plugin" % "3.0.9") addSbtPlugin("com.github.sbt" % "sbt-jacoco" % "3.5.0") + +addSbtPlugin("com.github.sbt" % "sbt-less" % "2.0.1") + +addSbtPlugin("com.github.sbt" % "sbt-coffeescript" % "2.0.1") \ No newline at end of file From bf6086869a3c69af730bc13f497666418c9ef53a Mon Sep 17 00:00:00 2001 From: nattamon Date: Fri, 21 Nov 2025 02:05:29 -0500 Subject: [PATCH 02/18] EDIT import --- app/controllers/HomeController.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index 14f3d53..f513f29 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -1,11 +1,18 @@ package controllers; +import actors.UserParentActor; +import com.fasterxml.jackson.databind.JsonNode; import com.typesafe.config.Config; import models.SourcesInfo; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.typed.Scheduler; +import org.apache.pekko.stream.javadsl.Flow; import play.cache.AsyncCacheApi; +import play.libs.F.Either; import play.mvc.*; import javax.inject.Inject; +import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -14,6 +21,12 @@ import models.Source; import models.StatsInfo; import play.libs.ws.*; + +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.Props; +import org.apache.pekko.actor.typed.javadsl.AskPattern; + /** * Main application controller that handles requests for searching news, viewing sources, * displaying results, and navigating between pages. This controller manages session tokens @@ -83,9 +96,6 @@ public WebSocket ws() { scheduler ); - // Ensure session token is set - return result.addingToSession(request, "user_token", token); - }); return future.thenApply(flow -> Either.>Right(flow) ).exceptionally(e -> { From aac19d0c616c717cfb7a2c0a783843a17f8c3468 Mon Sep 17 00:00:00 2001 From: nattamon Date: Fri, 21 Nov 2025 11:51:56 -0500 Subject: [PATCH 03/18] EDIT change parentUser actor to supervisorActor & tmp add cache for stat --- ...{UserParentActor.java => SupervisorActor.java} | 10 ++++++---- app/actors/UserActor.java | 15 ++++++++++++--- app/controllers/HomeController.java | 9 +++++---- 3 files changed, 23 insertions(+), 11 deletions(-) rename app/actors/{UserParentActor.java => SupervisorActor.java} (89%) diff --git a/app/actors/UserParentActor.java b/app/actors/SupervisorActor.java similarity index 89% rename from app/actors/UserParentActor.java rename to app/actors/SupervisorActor.java index 5252332..992d7e7 100644 --- a/app/actors/UserParentActor.java +++ b/app/actors/SupervisorActor.java @@ -6,13 +6,14 @@ import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; +import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; /** * Parent actor that manages the creation of individual UserActors for each WebSocket connection. */ -public final class UserParentActor { - private UserParentActor() {} +public final class SupervisorActor { + private SupervisorActor() {} // Message to request the creation of a UserActor public static final class Create { @@ -25,13 +26,14 @@ public Create(String id, ActorRef> replyTo) { } } - public static Behavior create(WSClient wsClient, String apiKey) { + // TODO rm cache + public static Behavior create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { return Behaviors.setup(context -> { return Behaviors.receive(Create.class) .onMessage(Create.class, create -> { // Spawn a new UserActor for this specific connection ID ActorRef child = context.spawn( - UserActor.create(create.id, wsClient, apiKey), + UserActor.create(create.id, wsClient, apiKey, asyncCache), "userActor-" + create.id ); // Ask the child to establish the stream diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index 523b9d2..a7c2162 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -10,6 +10,7 @@ import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.japi.Pair; import com.fasterxml.jackson.databind.JsonNode; +import play.cache.AsyncCacheApi; import play.libs.Json; import play.libs.ws.WSClient; import models.Search; @@ -53,17 +54,22 @@ private static final class InternalStop implements Message {} private final Sink hubSink; private final Flow websocketFlow; + // TODO rm cache + private final AsyncCacheApi asyncCache; // --- Factory --- - public static Behavior create(String id, WSClient ws, String apiKey) { - return Behaviors.setup(context -> new UserActor(id, ws, apiKey, context).behavior()); + // TODO rm cache + public static Behavior create(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache) { + return Behaviors.setup(context -> new UserActor(id, ws, apiKey, asyncCache, context).behavior()); } // --- Constructor --- - private UserActor(String id, WSClient ws, String apiKey, ActorContext context) { + // TODO rm cache + private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorContext context) { this.id = id; this.ws = ws; this.apiKey = apiKey; this.context = context; + this.asyncCache = asyncCache; Materializer mat = Materializer.matFromSystem(context.getSystem()); @@ -120,6 +126,9 @@ private Behavior behavior() { searchHistory.add(0, search); if (searchHistory.size() > 10) searchHistory.remove(searchHistory.size() - 1); + // TODO rm cache + asyncCache.set(id, new ArrayList<>(searchHistory)); + // Send updated results back to the user JsonNode response = Json.toJson(searchHistory); Source.single(response).runWith(hubSink, Materializer.matFromSystem(context.getSystem())); diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index f513f29..b92bd55 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -1,6 +1,6 @@ package controllers; -import actors.UserParentActor; +import actors.SupervisorActor; import com.fasterxml.jackson.databind.JsonNode; import com.typesafe.config.Config; import models.SourcesInfo; @@ -45,7 +45,7 @@ public class HomeController extends Controller { private final AsyncCacheApi asyncCache; private final ActorSystem actorSystem; - private final ActorRef userParentActor; + private final ActorRef userParentActor; /** * Creates a HomeController with the required web client, caching system, and API key. @@ -62,8 +62,9 @@ public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config, Acto this.apiKey = config.getString("api.key"); this.actorSystem = actorSystem; + // TODO rm cache this.userParentActor = actorSystem.systemActorOf( - UserParentActor.create(ws, apiKey), "UserParentActor", Props.empty() + SupervisorActor.create(ws, apiKey, asyncCache), "UserParentActor", Props.empty() ); } @@ -91,7 +92,7 @@ public WebSocket ws() { // Ask the UserParentActor to create a flow for this user CompletionStage> future = AskPattern.ask( userParentActor, - (ActorRef> replyTo) -> new UserParentActor.Create(id, replyTo), + (ActorRef> replyTo) -> new SupervisorActor.Create(id, replyTo), Duration.ofSeconds(10), scheduler ); From 61a75018a7de36d4cfc147b043c18540bbf93b51 Mon Sep 17 00:00:00 2001 From: nattamon Date: Fri, 21 Nov 2025 13:29:30 -0500 Subject: [PATCH 04/18] EDIT split index.coffee to article & search file --- app/assets/javascripts/article.coffee | 37 ++++++++ app/assets/javascripts/index.coffee | 132 ++++---------------------- app/assets/javascripts/search.coffee | 62 ++++++++++++ app/views/index.scala.html | 6 ++ 4 files changed, 124 insertions(+), 113 deletions(-) create mode 100644 app/assets/javascripts/article.coffee create mode 100644 app/assets/javascripts/search.coffee diff --git a/app/assets/javascripts/article.coffee b/app/assets/javascripts/article.coffee new file mode 100644 index 0000000..71a7824 --- /dev/null +++ b/app/assets/javascripts/article.coffee @@ -0,0 +1,37 @@ +window.App ||= {} + +formatNumber = (val) -> + num = Number(val) + if isNaN(num) then "0.00" else num.toFixed(2) + +window.App.renderArticle = (article) -> + # For build the Source URL + sourceUrl = "/source/#{encodeURIComponent(article.sourceName)}?identifier=#{encodeURIComponent(article.sourceIdentifier)}" + + # Return the HTML string for ONE article + return """ +
+
+ #{article.title} +
+

+ + Source: #{article.sourceName} | + Published: #{article.publishedAtEDT} + +

+

+ + Flesch-Kincaid Grade Level: #{formatNumber(article.fleschKincaidGradeLevel)} | + Flesch Reading Ease Score: #{formatNumber(article.fleschReadingScore)} + +

+

+ + OSS Flesch-Kincaid Grade Level: #{formatNumber(article.fleschKincaidGradeLevelOSS)} | + OSS Flesch Reading Ease Score: #{formatNumber(article.fleschReadingScoreOSS)} + +

+

Description: #{article.description}

+
+ """ \ No newline at end of file diff --git a/app/assets/javascripts/index.coffee b/app/assets/javascripts/index.coffee index a9e3d58..6342d57 100644 --- a/app/assets/javascripts/index.coffee +++ b/app/assets/javascripts/index.coffee @@ -1,15 +1,12 @@ $ -> - console.log "--> Starting NotiLytics Client Script" - + console.log "--> Starting NotiLytics Client" ws = null - # Form Submit Handler + # --- Form Handler --- $("#searchForm").submit (event) -> - console.log "--> Blocking reload." event.preventDefault() - query = $("#searchbox").val() - sortBy = $("input[name='sortBy']:checked").val() + sortBy = $("input[name='sortBy']:checked").val() || "popularity" if ws and ws.readyState == WebSocket.OPEN console.log "--> Sending via WebSocket:", query, " Sort:", sortBy @@ -18,121 +15,30 @@ $ -> # clear search box $("#searchbox").val("") else - console.error "--> WebSocket is not connected! Cannot search." + console.error "WebSocket not ready" - # WebSocket Connection + # --- WebSocket Connection --- wsUrl = $("#ws-data").data("ws-url") console.log "--> Found WS URL:", wsUrl if wsUrl ws = new WebSocket(wsUrl) - ws.onopen = -> - console.log "--> WebSocket Connection OPEN" - ws.onmessage = (event) -> - # The server sends the entire Search History as a JSON List data = JSON.parse event.data - if data.type == "ping" - console.log "--> Received Keep-Alive Ping" - return - if Array.isArray(data) - console.log "--> Received History Update" - updateResults(data) - - ws.onerror = (error) -> - console.error "--> WebSocket Error:", error - else - console.error "--> Error: No WebSocket URL found in HTML #ws-data" - -# Result Rendering Logic -updateResults = (history) -> - container = $("#results-area") - container.empty() # Clear previous results - - # loop list of Search objects - for search in history - - # Build the Search Header (Sentiment, Stats, Readability) - - # Calculate Sentiment HTML - sentimentHtml = "" - if search.sentiment == ":-)" - sentimentHtml = ":-)" - else if search.sentiment == ":-(" - sentimentHtml = ":-(" - else - sentimentHtml = ":-|" - # main search part - searchHtml = $(""" -
-

Search Results

+ # pings + if data.type == "ping" then return -

- Query: #{search.rawQuery} | - Sorted by: #{search.sortBy} | - Sentiment: #{sentimentHtml} -

- -

- Average Flesch-Kincaid Reading Grade: #{search.fleschGradeLevel.toFixed(2)} | - Flesch Reading Ease Score: #{search.fleshReadingScore.toFixed(2)} -

-

- Average Flesch-Kincaid Reading Grade (OSS): #{search.fleschGradeLevelOSS.toFixed(2)} | - Flesch Reading Ease Score (OSS): #{search.fleshReadingScoreOSS.toFixed(2)} -

- -

- - Statistics for "#{search.rawQuery}" - -

- -
-
-
- """) - - # Build the Article List - articleContainer = searchHtml.find(".article-list") - - # Limit to top 10 results - articles = search.results.slice(0, 10) - - for article in articles - # Construct the Source Link URL - # Route: /source/:sourceName?identifier=:identifier - sourceUrl = "/source/#{encodeURIComponent(article.sourceName)}?identifier=#{encodeURIComponent(article.sourceIdentifier)}" - - articleCard = $(""" -
-
- #{article.title} -
-

- - Source: #{article.sourceName} | - Published: #{article.publishedAtEDT} - -

-

- - Flesch-Kincaid Grade Level: #{article.fleschKincaidGradeLevel.toFixed(2)} | - Flesch Reading Ease Score: #{article.fleschReadingScore.toFixed(2)} - -

-

- - OSS Flesch-Kincaid Grade Level: #{article.fleschKincaidGradeLevelOSS.toFixed(2)} | - OSS Flesch Reading Ease Score: #{article.fleschReadingScoreOSS.toFixed(2)} - -

-

Description: #{article.description}

-
- """) - articleContainer.append(articleCard) - - # Append the full search block to the page - container.append(searchHtml) \ No newline at end of file + # Process History List + if Array.isArray(data) + container = $("#results-area") + container.empty() + + for search in data + # Call the Search Renderer + if window.App.renderSearch + searchHtml = window.App.renderSearch(search) + container.append(searchHtml) + else + console.error "No WS URL found" \ No newline at end of file diff --git a/app/assets/javascripts/search.coffee b/app/assets/javascripts/search.coffee new file mode 100644 index 0000000..bc6b79e --- /dev/null +++ b/app/assets/javascripts/search.coffee @@ -0,0 +1,62 @@ +window.App ||= {} + +formatNumber = (val) -> + num = Number(val) + if isNaN(num) then "0.00" else num.toFixed(2) + +window.App.renderSearch = (search) -> + # Sentiment part + sentimentHtml = "" + if search.sentiment == ":-)" + sentimentHtml = ":-)" + else if search.sentiment == ":-(" + sentimentHtml = ":-(" + else + sentimentHtml = ":-|" + + # Build the Main Container +# console.log "search = ", search + searchBlock = $(""" +
+

Search Results

+ +

+ Query: #{search.rawQuery} | + Sorted by: #{search.sortBy} | + Sentiment: #{sentimentHtml} +

+ +

+ Average Flesch-Kincaid Reading Grade: #{formatNumber(search.fleschGradeLevel)} | + Flesch Reading Ease Score: #{formatNumber(search.fleshReadingScore)} +

+

+ Average Flesch-Kincaid Reading Grade (OSS): #{formatNumber(search.fleschGradeLevelOSS)} | + Flesch Reading Ease Score (OSS): #{formatNumber(search.fleshReadingScoreOSS)} +

+ +

+ + Statistics for "#{search.rawQuery}" + +

+ +
+
+ """) + + # Articles + articleContainer = searchBlock.find(".article-list") + + # Take top 10 articles + articles = search.results.slice(0, 10) + + for article in articles + # call other file to get the HTML for this article + if window.App.renderArticle + html = window.App.renderArticle(article) + articleContainer.append(html) + else + console.error "Missing ui-article.js!" + + return searchBlock \ No newline at end of file diff --git a/app/views/index.scala.html b/app/views/index.scala.html index f19f31a..dacfeab 100644 --- a/app/views/index.scala.html +++ b/app/views/index.scala.html @@ -7,5 +7,11 @@
+ @* Load the Article Renderer *@ + + @* Load the Search Renderer*@ + + + @* Load the Main Controller *@ } \ No newline at end of file From b339eaf4768e2e38bd793a1e6c99a4ed8a66857e Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Tue, 25 Nov 2025 00:43:49 -0500 Subject: [PATCH 05/18] ADD SupervisorActor --- app/actors/SupervisorActor.java | 122 ++++++++++++++++++++++++-------- 1 file changed, 93 insertions(+), 29 deletions(-) diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index 992d7e7..361d01b 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -3,44 +3,108 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.SupervisorStrategy; +import org.apache.pekko.actor.typed.Terminated; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + /** - * Parent actor that manages the creation of individual UserActors for each WebSocket connection. + * Parent actor that manages the creation of individual actors + * for each WebSocket connection / logical user. + * + * This actor is responsible for: + * - Creating at most one UserActor per logical user id + * - Supervising children and restarting them on failures (e.g., News API errors) + * - Returning the Flow that is attached + * to the Play WebSocket for a given user + * + * @author Siming Yi */ public final class SupervisorActor { - private SupervisorActor() {} - // Message to request the creation of a UserActor - public static final class Create { - final String id; - final ActorRef> replyTo; + private SupervisorActor() { + // static utility – no instances + } + + /** + * Message used by the controller to request (or reuse) a {@link UserActor} + * and obtain the WebSocket {@link Flow}. + */ + public static final class Create { + /** Stable identifier for the logical user / session (e.g., session token). */ + public final String id; + + /** Actor that will receive the materialized WebSocket flow. */ + public final ActorRef> replyTo; + + public Create(String id, ActorRef> replyTo) { + this.id = id; + this.replyTo = replyTo; + } + } + + /** + * Factory method for the supervisor behavior. + * + * @param wsClient WS client used by child actors when calling the News API + * @param apiKey API key for the News API + * @param asyncCache Cache used by child actors to persist user search history + * @return behavior handling {@link Create} messages + */ + public static Behavior create(WSClient wsClient, + String apiKey, + AsyncCacheApi asyncCache) { + + return Behaviors.setup(context -> { + // one UserActor per logical user id + Map> children = new HashMap<>(); + + return Behaviors + .receive(SupervisorActor.Create.class) + .onMessage(SupervisorActor.Create.class, msg -> { + ActorRef child = children.get(msg.id); + if (child == null) { + Behavior childBehavior = + Behaviors.supervise( + UserActor.create(msg.id, wsClient, apiKey, asyncCache) + ).onFailure( + Exception.class, + SupervisorStrategy + .restart() + .withLimit(3, Duration.ofMinutes(1)) + ); + + child = context.spawn(childBehavior, "userActor-" + msg.id); + context.watch(child); + children.put(msg.id, child); + context.getLog().info("Created UserActor for id={}", msg.id); + } else { + context.getLog().info("Reusing existing UserActor for id={}", msg.id); + } + + child.tell(new UserActor.Connect(msg.replyTo)); + return Behaviors.same(); + }) + .onSignal(Terminated.class, sig -> { + ActorRef ref = sig.getRef(); + + children.values().removeIf(child -> child.equals(ref)); + + context.getLog().info( + "Child actor {} terminated and was removed from supervisor registry.", + ref.path() + ); - public Create(String id, ActorRef> replyTo) { - this.id = id; - this.replyTo = replyTo; + return Behaviors.same(); + }) + .build(); + }); } - } - - // TODO rm cache - public static Behavior create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { - return Behaviors.setup(context -> { - return Behaviors.receive(Create.class) - .onMessage(Create.class, create -> { - // Spawn a new UserActor for this specific connection ID - ActorRef child = context.spawn( - UserActor.create(create.id, wsClient, apiKey, asyncCache), - "userActor-" + create.id - ); - // Ask the child to establish the stream - child.tell(new UserActor.Connect(create.replyTo)); - return Behaviors.same(); - }) - .build(); - }); - } -} \ No newline at end of file +} From f83b460cc543a0309aab0e05338821b488e01531 Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Thu, 27 Nov 2025 16:42:59 -0500 Subject: [PATCH 06/18] ADD SentimentAnalyzerActor --- app/actors/SentimentAnalyzerActor.java | 88 ++++++++++++++++++++++++++ app/actors/UserActor.java | 65 +++++++++++++++---- 2 files changed, 141 insertions(+), 12 deletions(-) create mode 100644 app/actors/SentimentAnalyzerActor.java diff --git a/app/actors/SentimentAnalyzerActor.java b/app/actors/SentimentAnalyzerActor.java new file mode 100644 index 0000000..87f10dc --- /dev/null +++ b/app/actors/SentimentAnalyzerActor.java @@ -0,0 +1,88 @@ +package actors; + +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import services.SentimentAnalyzer; + +import java.util.List; + +/** + * Actor wrapper around {@link services.SentimentAnalyzer}. + * + * It receives article text(s), computes the sentiment using the existing + * SentimentAnalyzer service, and replies with an emoji string (":-)", ":-(", ":-|"). + * + * @author Siming Yi + */ +public final class SentimentAnalyzerActor { + + /** Marker interface for all messages this actor accepts. */ + public interface Command {} + + /** + * Request to analyze a single article. + */ + public static final class AnalyzeSingle implements Command { + public final String text; + public final ActorRef replyTo; + + public AnalyzeSingle(String text, ActorRef replyTo) { + this.text = text; + this.replyTo = replyTo; + } + } + + /** + * Request to analyze the average sentiment of multiple articles. + */ + public static final class AnalyzeAverage implements Command { + public final List texts; + public final ActorRef replyTo; + + public AnalyzeAverage(List texts, ActorRef replyTo) { + this.texts = texts; + this.replyTo = replyTo; + } + } + + /** + * Response message containing the computed sentiment symbol. + */ + public static final class Result { + public final String sentiment; + + public Result(String sentiment) { + this.sentiment = sentiment; + } + } + + private SentimentAnalyzerActor() { + // no public constructor + } + + /** + * Factory method for creating the actor behavior. + * + * @return a Behavior that processes AnalyzeSingle and AnalyzeAverage messages. + */ + public static Behavior create() { + return Behaviors.setup(context -> { + // Reuse your existing service class + SentimentAnalyzer analyzer = new SentimentAnalyzer(); + + return Behaviors.receive(Command.class) + .onMessage(AnalyzeSingle.class, msg -> { + String result = analyzer.analyzeSingleArticle(msg.text); + msg.replyTo.tell(new Result(result)); + return Behaviors.same(); + }) + .onMessage(AnalyzeAverage.class, msg -> { + String result = analyzer.analyzeAverageSentiment(msg.texts); + msg.replyTo.tell(new Result(result)); + return Behaviors.same(); + }) + .build(); + }); + } +} diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index a7c2162..48e16e1 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -5,6 +5,7 @@ import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; +import org.apache.pekko.actor.typed.javadsl.AskPattern; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.*; @@ -51,6 +52,9 @@ private static final class InternalStop implements Message {} private final ActorContext context; private final List searchHistory = new ArrayList<>(); + /** Child actor that performs sentiment analysis for this user. */ + private final ActorRef sentimentActor; + private final Sink hubSink; private final Flow websocketFlow; @@ -71,6 +75,11 @@ private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCach this.context = context; this.asyncCache = asyncCache; + this.sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "SentimentAnalyzer-" + id + ); + Materializer mat = Materializer.matFromSystem(context.getSystem()); ActorRef self = context.getSelf(); @@ -122,18 +131,50 @@ private Behavior behavior() { Search search = new Search(msg.query, msg.sortBy, apiKey); - search.fetchResults(ws).thenAccept(v -> { - searchHistory.add(0, search); - if (searchHistory.size() > 10) searchHistory.remove(searchHistory.size() - 1); - - // TODO rm cache - asyncCache.set(id, new ArrayList<>(searchHistory)); - - // Send updated results back to the user - JsonNode response = Json.toJson(searchHistory); - Source.single(response).runWith(hubSink, Materializer.matFromSystem(context.getSystem())); - }); - return Behaviors.same(); + // call News API (as before) + search.fetchResults(ws) + // after results arrive, ask the SentimentAnalyzerActor + .thenCompose(v -> { + List texts = search.getResults().stream() + .limit(50) + .map(a -> a.getDescription() != null ? a.getDescription() : "") + .toList(); + + // empty -> neutral + if (texts.isEmpty()) { + search.setSentiment(":-|"); + return java.util.concurrent.CompletableFuture.completedFuture(null); + } + + // use ask pattern for sentiment analyze + return AskPattern.ask( + sentimentActor, + (ActorRef replyTo) -> + new SentimentAnalyzerActor.AnalyzeAverage(texts, replyTo), + Duration.ofSeconds(2), + context.getSystem().scheduler() + ).thenAccept(result -> { + search.setSentiment(result.sentiment); + }); + }) + .thenAccept(v2 -> { + searchHistory.add(0, search); + if (searchHistory.size() > 10) { + searchHistory.remove(searchHistory.size() - 1); + } + + // TODO rm cache + asyncCache.set(id, new ArrayList<>(searchHistory)); + + // Send updated results back to the user + JsonNode response = Json.toJson(searchHistory); + Source.single(response).runWith( + hubSink, + Materializer.matFromSystem(context.getSystem()) + ); + }); + + return Behaviors.same(); }) // Handle the stop message .onMessage(InternalStop.class, msg -> { From f75ed1e0372c735a7357de15c6d9e9dc2fbb1a64 Mon Sep 17 00:00:00 2001 From: nattamon Date: Tue, 25 Nov 2025 11:48:45 -0500 Subject: [PATCH 07/18] EDIT controller unit test --- test/controllers/HomeControllerTest.java | 224 ++++++++++++----------- 1 file changed, 113 insertions(+), 111 deletions(-) diff --git a/test/controllers/HomeControllerTest.java b/test/controllers/HomeControllerTest.java index 9d86278..9f03cd9 100644 --- a/test/controllers/HomeControllerTest.java +++ b/test/controllers/HomeControllerTest.java @@ -1,11 +1,15 @@ package controllers; +import actors.SupervisorActor; import com.fasterxml.jackson.databind.node.ObjectNode; import com.typesafe.config.Config; import models.Article; import models.Search; import models.Source; import models.SourcesInfo; +import org.apache.pekko.actor.Scheduler; +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.ActorSystem; import org.junit.Before; import org.mockito.ArgumentCaptor; import org.mockito.Mock; @@ -58,6 +62,9 @@ public class HomeControllerTest extends WithApplication { /** Mock HTTP response returned from API calls. */ @Mock private WSResponse mockResponse; + @Mock private ActorSystem mockActorSystem; + @Mock private Scheduler mockScheduler; + @Mock private ActorRef mockSupervisorActor; /** * Provide application part @@ -74,7 +81,9 @@ protected Application provideApplication() { .overrides( bind(WSClient.class).toInstance(mockWsClient), bind(AsyncCacheApi.class).toInstance(mockCache), - bind(Config.class).toInstance(mockConfig) + bind(Config.class).toInstance(mockConfig), + bind(ActorSystem.class).toInstance(mockActorSystem), + bind(ActorRef.class).qualifiedWith("SupervisorActor").toInstance(mockSupervisorActor) ) .build(); } @@ -95,32 +104,25 @@ public void setupMocks() { ObjectNode emptyJson = Json.newObject(); when(mockResponse.asJson()).thenReturn(emptyJson); + + when(mockCache.get(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); } /** * Renders the home page when no history is cached: expects 200, HTML welcome text, and a new session token. - * Verifies a cache lookup happens for the generated token. * @author Nattamon Paiboon */ @Test - public void testIndex_withEmptyCache() { - // Mock the cache to return an empty Optional for any string - when(mockCache.get(anyString())) - .thenReturn(CompletableFuture.completedFuture(Optional.empty())); - + public void testIndex() { Http.RequestBuilder request = new Http.RequestBuilder() .method(GET) .uri("/"); Result result = route(app, request); - // --- Assert --- assertEquals(OK, result.status()); assertEquals("text/html", result.contentType().get()); - assertTrue(contentAsString(result).contains("Welcome to NotiLytics (made by DNSL)")); - assertNotNull(result.session().get("user_token").get()); - verify(mockCache, times(1)).get(result.session().get("user_token").get()); } /** @@ -128,117 +130,117 @@ public void testIndex_withEmptyCache() { * Verifies the cache is read with the session token and that the token is present. * @author Nattamon Paiboon */ - @Test - public void testIndex_withCache() { - ArrayList mockHistory = new ArrayList<>(); - List
emptyArticleList = new ArrayList<>(); - - Search mockSearch = mock(Search.class); - when(mockSearch.getRawQuery()).thenReturn("test query"); - when(mockSearch.getSortBy()).thenReturn("publishedAt"); - when(mockSearch.getResults()).thenReturn(emptyArticleList); - - mockHistory.add(mockSearch); - - // Mock the cache to return our fake history - when(mockCache.get(anyString())) - .thenReturn(CompletableFuture.completedFuture(Optional.of(mockHistory))); - - Http.RequestBuilder request = new Http.RequestBuilder() - .method(GET) - .uri("/"); - - Result result = route(app, request); - String token = result.session().get("user_token").get(); - - assertEquals("Status should be OK", OK, result.status()); - assertEquals("Content type should be HTML", "text/html", result.contentType().get()); - assertTrue("Should render the results page", - contentAsString(result).contains("Search Results")); - verify(mockCache, times(1)).get(token); - assertNotNull("Session token should be set", token); - } +// @Test +// public void testIndex_withCache() { +// ArrayList mockHistory = new ArrayList<>(); +// List
emptyArticleList = new ArrayList<>(); +// +// Search mockSearch = mock(Search.class); +// when(mockSearch.getRawQuery()).thenReturn("test query"); +// when(mockSearch.getSortBy()).thenReturn("publishedAt"); +// when(mockSearch.getResults()).thenReturn(emptyArticleList); +// +// mockHistory.add(mockSearch); +// +// // Mock the cache to return our fake history +// when(mockCache.get(anyString())) +// .thenReturn(CompletableFuture.completedFuture(Optional.of(mockHistory))); +// +// Http.RequestBuilder request = new Http.RequestBuilder() +// .method(GET) +// .uri("/"); +// +// Result result = route(app, request); +// String token = result.session().get("user_token").get(); +// +// assertEquals("Status should be OK", OK, result.status()); +// assertEquals("Content type should be HTML", "text/html", result.contentType().get()); +// assertTrue("Should render the results page", +// contentAsString(result).contains("Search Results")); +// verify(mockCache, times(1)).get(token); +// assertNotNull("Session token should be set", token); +// } /** * Performs a search with no cached history: expects 200 and "Search Results". * Verifies the NewsAPI endpoint is called with q=test and cache is checked via the session token. * @author Nattamon Paiboon */ - @Test - public void testSearch_withEmptyCache() { - when(mockCache.get(anyString())) - .thenReturn(CompletableFuture.completedFuture(Optional.empty())); - - ObjectNode fakeArticlesJson = Json.newObject(); - fakeArticlesJson.putArray("articles"); - - when(mockResponse.asJson()).thenReturn(fakeArticlesJson); - - Http.RequestBuilder request = new Http.RequestBuilder() - .method(GET) - .uri("/search?rawQuery=test&sortBy=popularity"); - - Result result = route(app, request); - String token = result.session().get("user_token").get(); - - assertEquals(OK, result.status()); - assertTrue(contentAsString(result).contains("Search Results")); - - verify(mockWsClient, times(1)).url("https://newsapi.org/v2/everything"); - verify(mockRequest).addQueryParameter("q", "test"); - verify(mockCache, times(1)).get(token); - assertNotNull("Session token should be set", token); - } +// @Test +// public void testSearch_withEmptyCache() { +// when(mockCache.get(anyString())) +// .thenReturn(CompletableFuture.completedFuture(Optional.empty())); +// +// ObjectNode fakeArticlesJson = Json.newObject(); +// fakeArticlesJson.putArray("articles"); +// +// when(mockResponse.asJson()).thenReturn(fakeArticlesJson); +// +// Http.RequestBuilder request = new Http.RequestBuilder() +// .method(GET) +// .uri("/search?rawQuery=test&sortBy=popularity"); +// +// Result result = route(app, request); +// String token = result.session().get("user_token").get(); +// +// assertEquals(OK, result.status()); +// assertTrue(contentAsString(result).contains("Search Results")); +// +// verify(mockWsClient, times(1)).url("https://newsapi.org/v2/everything"); +// verify(mockRequest).addQueryParameter("q", "test"); +// verify(mockCache, times(1)).get(token); +// assertNotNull("Session token should be set", token); +// } /** * Adds a new search when history already has 10 items: keeps list at 10 with the new search first. * @author Nattamon Paiboon */ - @Test - public void testSearch_withCache() { - ArrayList fullHistory = new ArrayList<>(); - List
emptyArticleList = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - Search mockSearch = mock(Search.class); - - when(mockSearch.getRawQuery()).thenReturn("old_query_" + i); - when(mockSearch.getSortBy()).thenReturn("popularity"); - - when(mockSearch.getResults()).thenReturn(emptyArticleList); - - fullHistory.add(mockSearch); - } - assertEquals(10, fullHistory.size()); - - when(mockCache.get(eq("test-token-123"))) - .thenReturn(CompletableFuture.completedFuture(Optional.of(fullHistory))); - - ArgumentCaptor listCaptor = ArgumentCaptor.forClass(List.class); - - when(mockCache.set(eq("test-token-123"), listCaptor.capture(), eq(3600))) - .thenReturn(CompletableFuture.completedFuture(null)); - - ObjectNode fakeArticlesJson = Json.newObject(); - fakeArticlesJson.putArray("articles"); - when(mockResponse.asJson()).thenReturn(fakeArticlesJson); - - Http.RequestBuilder request = new Http.RequestBuilder() - .method(GET) - .uri("/search?rawQuery=new_search&sortBy=popularity") - .session("user_token", "test-token-123"); - - Result result = route(app, request); - - // --- Assert --- - assertEquals(OK, result.status()); - verify(mockCache, times(1)).set(eq("test-token-123"), any(List.class), eq(3600)); - List savedList = (List) listCaptor.getValue(); - assertEquals("History size should be capped at 10", 10, savedList.size()); - assertEquals("The new search should be first", "new_search", - savedList.getFirst().getRawQuery()); - assertEquals("The second-oldest item should now be last", "old_query_8", - savedList.getLast().getRawQuery()); - } +// @Test +// public void testSearch_withCache() { +// ArrayList fullHistory = new ArrayList<>(); +// List
emptyArticleList = new ArrayList<>(); +// for (int i = 0; i < 10; i++) { +// Search mockSearch = mock(Search.class); +// +// when(mockSearch.getRawQuery()).thenReturn("old_query_" + i); +// when(mockSearch.getSortBy()).thenReturn("popularity"); +// +// when(mockSearch.getResults()).thenReturn(emptyArticleList); +// +// fullHistory.add(mockSearch); +// } +// assertEquals(10, fullHistory.size()); +// +// when(mockCache.get(eq("test-token-123"))) +// .thenReturn(CompletableFuture.completedFuture(Optional.of(fullHistory))); +// +// ArgumentCaptor listCaptor = ArgumentCaptor.forClass(List.class); +// +// when(mockCache.set(eq("test-token-123"), listCaptor.capture(), eq(3600))) +// .thenReturn(CompletableFuture.completedFuture(null)); +// +// ObjectNode fakeArticlesJson = Json.newObject(); +// fakeArticlesJson.putArray("articles"); +// when(mockResponse.asJson()).thenReturn(fakeArticlesJson); +// +// Http.RequestBuilder request = new Http.RequestBuilder() +// .method(GET) +// .uri("/search?rawQuery=new_search&sortBy=popularity") +// .session("user_token", "test-token-123"); +// +// Result result = route(app, request); +// +// // --- Assert --- +// assertEquals(OK, result.status()); +// verify(mockCache, times(1)).set(eq("test-token-123"), any(List.class), eq(3600)); +// List savedList = (List) listCaptor.getValue(); +// assertEquals("History size should be capped at 10", 10, savedList.size()); +// assertEquals("The new search should be first", "new_search", +// savedList.getFirst().getRawQuery()); +// assertEquals("The second-oldest item should now be last", "old_query_8", +// savedList.getLast().getRawQuery()); +// } /** * Fetches sources and articles when neither sourcesInfo nor the source page is cached: expects 200. From 8b3fc5c45555b3a4d74ca8e739cc6b52f34f678f Mon Sep 17 00:00:00 2001 From: nattamon Date: Tue, 25 Nov 2025 16:14:54 -0500 Subject: [PATCH 08/18] ADD source actor --- app/actors/SourceActor.java | 127 ++++++++++++++++++++++++++++ app/actors/SourcesInfoActor.java | 108 +++++++++++++++++++++++ app/controllers/HomeController.java | 110 +++++++++++------------- 3 files changed, 285 insertions(+), 60 deletions(-) create mode 100644 app/actors/SourceActor.java create mode 100644 app/actors/SourcesInfoActor.java diff --git a/app/actors/SourceActor.java b/app/actors/SourceActor.java new file mode 100644 index 0000000..53361b1 --- /dev/null +++ b/app/actors/SourceActor.java @@ -0,0 +1,127 @@ +package actors; + +import models.Source; +import models.SourcesInfo; +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; +import org.apache.pekko.actor.typed.javadsl.ActorContext; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.typed.javadsl.Receive; +import play.libs.ws.WSClient; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * Actor responsible for fetching the profile and 10 latest headlines for a specific news source. + * This actor orchestrates the creation of a Source model. It determines + * whether the requested identifier corresponds to a known News ID or a web domain. + * 1. This will receive a GetProfile message containing the target identifier and the global source list. + * 2. Then checks if the identifier exists in the global list. + * 3. Instantiates a Source model object with this metadata. + * 4. Fetches the latest 10 headlines for that source from the News API. + * 5. Replies to the sender with the Source model object. + * + * @author Nattamon Paiboon + */ +public class SourceActor extends AbstractBehavior { + /** HTTP client used to call external APIs (e.g., NewsAPI) and fetch JSON responses. */ + private final WSClient wsClient; + /** Cache to store map of source profile */ + private final Map cache = new HashMap<>(); + + /** Message interface */ + public interface Command {} + + /** + * Message to request a profile for a specific news source. + */ + public static class GetProfile implements Command { + /** The ID or domain to search */ + public final String identifier; + /** The display name of the source. */ + public final String sourceName; + /** The global list of sources information*/ + public final SourcesInfo sourcesInfo; + /** API key read from configuration and attached to outbound API requests. */ + public final String apiKey; + /** response of this message */ + public final ActorRef replyTo; + + public GetProfile(String identifier, String sourceName, SourcesInfo sourcesInfo, String apiKey, ActorRef replyTo) { + this.identifier = identifier; + this.sourceName = sourceName; + this.sourcesInfo = sourcesInfo; + this.apiKey = apiKey; + this.replyTo = replyTo; + } + } + + /** Response interface */ + public interface Response {} + /** + * Response containing the source profile with articles. + */ + public static class ProfileLoaded implements Response { + public final Source source; + public ProfileLoaded(Source source) { + this.source = source; + } + } + + public static class ProfileFailed implements Response { + public final String error; + public ProfileFailed(String error) { + this.error = error; + } + } + + public static Behavior create(WSClient wsClient) { + return Behaviors.setup(context -> new SourceActor(context, wsClient)); + } + + private SourceActor(ActorContext context, WSClient wsClient) { + super(context); + this.wsClient = wsClient; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetProfile.class, this::onGetProfile) + .build(); + } + + private Behavior onGetProfile(GetProfile msg) { + // check cachr for source article first + if (cache.containsKey(msg.identifier)) { + getContext().getLog().info("Returning cached profile for: {}", msg.identifier); + msg.replyTo.tell(new ProfileLoaded(cache.get(msg.identifier))); + return this; + } + + getContext().getLog().info("Actor processing profile for source: {}", msg.identifier); + + SourcesInfo.SourceInfo info = null; + if (msg.sourcesInfo != null) { + info = msg.sourcesInfo.getSourceInfo(msg.identifier); + } + + Source sourceModel = new Source(msg.apiKey, msg.identifier, msg.sourceName, info); + + CompletionStage future = sourceModel.fetchSourceResult(wsClient); + + future.whenComplete((ignored, error) -> { + if (error != null) { + msg.replyTo.tell(new ProfileFailed(error.getMessage())); + } else { + cache.put(msg.identifier, sourceModel); // store source in cache + msg.replyTo.tell(new ProfileLoaded(sourceModel)); + } + }); + + return this; + } +} \ No newline at end of file diff --git a/app/actors/SourcesInfoActor.java b/app/actors/SourcesInfoActor.java new file mode 100644 index 0000000..ba5a3bb --- /dev/null +++ b/app/actors/SourcesInfoActor.java @@ -0,0 +1,108 @@ +package actors; + +import models.SourcesInfo; +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; +import org.apache.pekko.actor.typed.javadsl.ActorContext; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.typed.javadsl.Receive; +import play.libs.ws.WSClient; + +import java.util.concurrent.CompletionStage; + +/** + * Actor responsible for fetching and caching the list of all available news sources from the News API. + * It fetchss the list of all avaiable publishers from /v2/top-headlines/sources endpoint. + * To minimize API calls, this actor also maintains internal state at cachedInfo, + * once the list is successfully fetched for the first time, it stored in this cachedInfo. + * @author Nattamon Paiboon + */ +public class SourcesInfoActor extends AbstractBehavior { + /** HTTP client used to call external APIs (e.g., NewsAPI) and fetch JSON responses. */ + private final WSClient wsClient; + /** API key read from configuration and attached to outbound API requests. */ + private final String apiKey; + /** Cache to store list of sources information from the News Api */ + private SourcesInfo cachedInfo = null; + + /** Message interface */ + public interface Command {} + + /** + * Message to request the list of all news sources. + * The actor will reply with either SourcesInfoLoaded or SourcesInfoFailed. + */ + public static class GetSourcesInfo implements Command { + public final ActorRef replyTo; + + public GetSourcesInfo(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + /** Response interface for response message */ + public interface Response {} + + /** + * Response message containing the successfully fetched (or cached) list of sources. + */ + public static class SourcesInfoLoaded implements Response { + public final SourcesInfo sourcesInfo; + public SourcesInfoLoaded(SourcesInfo sourcesInfo) { + this.sourcesInfo = sourcesInfo; + } + } + + /** + * Response message indicating that the API call to fetch sources failed. + */ + public static class SourcesInfoFailed implements Response { + public final String error; + public SourcesInfoFailed(String error) { + this.error = error; + } + } + + public static Behavior create(WSClient wsClient, String apiKey) { + return Behaviors.setup(context -> new SourcesInfoActor(context, wsClient, apiKey)); + } + + private SourcesInfoActor(ActorContext context, WSClient wsClient, String apiKey) { + super(context); + this.wsClient = wsClient; + this.apiKey = apiKey; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetSourcesInfo.class, this::onGetSourcesInfo) + .build(); + } + + private Behavior onGetSourcesInfo(GetSourcesInfo msg) { + // check cache first + if (this.cachedInfo != null) { + getContext().getLog().info("Returning cached SourcesInfo"); + msg.replyTo.tell(new SourcesInfoLoaded(this.cachedInfo)); + return this; + } + + getContext().getLog().info("SourcesInfoActor processing request for all sources."); + + SourcesInfo sourcesInfoModel = new SourcesInfo(this.apiKey); + CompletionStage future = sourcesInfoModel.fetchSourcesInfo(wsClient); + + future.whenComplete((ignored, error) -> { + if (error != null) { + msg.replyTo.tell(new SourcesInfoFailed(error.getMessage())); + } else { + this.cachedInfo = sourcesInfoModel; // store sourceInfo in cache + msg.replyTo.tell(new SourcesInfoLoaded(sourcesInfoModel)); + } + }); + + return this; + } +} \ No newline at end of file diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index b92bd55..d15bf23 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -1,5 +1,7 @@ package controllers; +import actors.SourceActor; +import actors.SourcesInfoActor; import actors.SupervisorActor; import com.fasterxml.jackson.databind.JsonNode; import com.typesafe.config.Config; @@ -47,6 +49,11 @@ public class HomeController extends Controller { private final ActorSystem actorSystem; private final ActorRef userParentActor; + /** Actor of the source profile */ + private final ActorRef sourceActor; + /** Actor of the sources information */ + private final ActorRef sourcesInfoActor; + /** * Creates a HomeController with the required web client, caching system, and API key. * @@ -66,13 +73,12 @@ public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config, Acto this.userParentActor = actorSystem.systemActorOf( SupervisorActor.create(ws, apiKey, asyncCache), "UserParentActor", Props.empty() ); + this.sourceActor = actorSystem.systemActorOf(SourceActor.create(ws), "sourceActor", Props.empty()); + this.sourcesInfoActor = actorSystem.systemActorOf(SourcesInfoActor.create(ws, apiKey), "sourcesInfoActor", Props.empty()); } /** - * Handles requests to the home page. - * Retrieves the user's search history from the cache using a session token. - * If the history is empty, renders the index page with a search bar, - * If not, renders the results page showing previous searches. Ensures the session token is set. + * Handles requests to the home page, render index page. * * @author Luan Tran * @param request the HTTP request containing session information @@ -146,64 +152,48 @@ public WebSocket ws() { // }); // } - /** - * This action is performed when the source page is called. It retrieves the cache of all sources' information and calls fetchSourcePage - * if there is no sourcesInfo in cache, then it calls fetchSourcesInfo to get all the sources' information - * @author Nattamon Paiboon - * @param identifier - used as an id of the source, it could be sourceID or domains (is sourceID is null) of the source - * @param sourceName - the official name of the source - * @return a CompletionStage calling fetchSourcePage - */ + /** + * This action is performed when the source page is called. It asks to get sourceInfo from sourceInfoActor first, + * after than ask for source profile (sourceInfo + article) from sourceActor. + * @param identifier - used as an id of the source, it could be sourceID or domains (is sourceID is null) of the source + * @param sourceName - the official name of the source + * @return a CompletionStage, render the source page + * @author Nattamon Paiboon + */ public CompletionStage source(String identifier, String sourceName) { - return asyncCache.get("sourcesInfo").thenCompose(optional -> { - if (optional.isPresent()) { - SourcesInfo sourcesInfo = (SourcesInfo) optional.get(); - return fetchSourcePage(identifier, sourceName, sourcesInfo); - } else { - SourcesInfo newSourcesInfo = new SourcesInfo(apiKey); - return newSourcesInfo.fetchSourcesInfo(ws) - .thenCompose(v -> asyncCache.set("sourcesInfo", newSourcesInfo)) - .thenCompose(v -> fetchSourcePage(identifier, sourceName, newSourcesInfo)); - } - }); - } + // Ask SourcesInfoActor for the list of all sources + return AskPattern.ask( + sourcesInfoActor, + (ActorRef replyTo) -> new SourcesInfoActor.GetSourcesInfo(replyTo), + Duration.ofSeconds(10), + actorSystem.scheduler() + ).thenCompose(infoResponse -> { + + // Extract the SourcesInfo object + SourcesInfo fetchedInfo = null; + if (infoResponse instanceof SourcesInfoActor.SourcesInfoLoaded) { + fetchedInfo = ((SourcesInfoActor.SourcesInfoLoaded) infoResponse).sourcesInfo; + } + + final SourcesInfo sourcesInfo = fetchedInfo; + + // Ask SourceActor for the profile & passing the sourcesInfo + return AskPattern.ask( + sourceActor, + (ActorRef replyTo) -> + new SourceActor.GetProfile(identifier, sourceName, sourcesInfo, this.apiKey, replyTo), + Duration.ofSeconds(10), + actorSystem.scheduler() + ); - /** - * This method get source information with fetch the 10 lastest article of the source. - * - * Checking cache first, if there is source page info in cache, then render using cache data - * If there is no cache, then create new source and add to cache. - * It gets source information of the specific source, using identifier (sourceID), creates a new source object - * which will fetch article of this source, and then render the source page of this source. - * - * @author Nattmon Paiboon - * @param identifier - used as an id of the source, it could be sourceID or domains (is sourceID is null) of the source - * @param sourceName - the official name of the source - * @param sourcesInfo - cache of all source information - * @return a CompletionStage rendering the source page of the source - */ - private CompletionStage fetchSourcePage(String identifier, String sourceName, SourcesInfo sourcesInfo) { - String cacheKey = "source_" + identifier; - - return asyncCache.get(cacheKey).thenCompose(optionalSource -> { - if (optionalSource.isPresent()) { - // with cache: render page with source info in cache - Source cachedSource = (Source) optionalSource.get(); - return CompletableFuture.completedFuture(ok(views.html.sources.render(cachedSource))); - } else { - // no cache: We need to fetch it - SourcesInfo.SourceInfo sourceInfo = sourcesInfo.getSourceInfo(identifier); - Source newSource = new Source(apiKey, identifier, sourceName, sourceInfo); - - return newSource.fetchSourceResult(this.ws) - .thenCompose(v -> { - return asyncCache.set(cacheKey, newSource, 3600); - }) - .thenApply(v -> { - return ok(views.html.sources.render(newSource)); - }); - } - }); + }).thenApply(response -> { + if (response instanceof SourceActor.ProfileLoaded) { + return ok(views.html.sources.render(((SourceActor.ProfileLoaded) response).source)); + } else { + String error = ((SourceActor.ProfileFailed) response).error; + return internalServerError("Error fetching source: " + error); + } + }); } /** From 1cf5c5e1141416426956a56e18e73cb07b990908 Mon Sep 17 00:00:00 2001 From: nattamon Date: Wed, 26 Nov 2025 22:07:39 -0500 Subject: [PATCH 09/18] ADD unit test for controller & source & sourceInfo Actor --- app/actors/SourcesInfoActor.java | 2 +- app/controllers/HomeController.java | 2 - build.sbt | 1 + test/actors/SourceActorTest.java | 129 ++++++++++ test/actors/SourcesInfoActorTest.java | 135 +++++++++++ test/controllers/HomeControllerTest.java | 295 +++++++++-------------- 6 files changed, 377 insertions(+), 187 deletions(-) create mode 100644 test/actors/SourceActorTest.java create mode 100644 test/actors/SourcesInfoActorTest.java diff --git a/app/actors/SourcesInfoActor.java b/app/actors/SourcesInfoActor.java index ba5a3bb..815214e 100644 --- a/app/actors/SourcesInfoActor.java +++ b/app/actors/SourcesInfoActor.java @@ -13,7 +13,7 @@ /** * Actor responsible for fetching and caching the list of all available news sources from the News API. - * It fetchss the list of all avaiable publishers from /v2/top-headlines/sources endpoint. + * It fetches the list of all available publishers from /v2/top-headlines/sources endpoint. * To minimize API calls, this actor also maintains internal state at cachedInfo, * once the list is successfully fetched for the first time, it stored in this cachedInfo. * @author Nattamon Paiboon diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index d15bf23..eafdc80 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -16,11 +16,9 @@ import javax.inject.Inject; import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import models.Search; -import models.Source; import models.StatsInfo; import play.libs.ws.*; diff --git a/build.sbt b/build.sbt index f6cd46e..d5d2b3c 100644 --- a/build.sbt +++ b/build.sbt @@ -17,6 +17,7 @@ libraryDependencies ++= Seq( "org.junit.platform" % "junit-platform-launcher" % "1.10.2" % Test, "io.whelk.flesch.kincaid" % "whelk-flesch-kincaid" % "0.1.11", "org.mockito" % "mockito-core" % "5.11.0" % Test, + "org.apache.pekko" %% "pekko-actor-testkit-typed" % "1.3.0" % Test, ) diff --git a/test/actors/SourceActorTest.java b/test/actors/SourceActorTest.java new file mode 100644 index 0000000..f2aea36 --- /dev/null +++ b/test/actors/SourceActorTest.java @@ -0,0 +1,129 @@ +package actors; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import models.SourcesInfo; +import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; +import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; +import org.apache.pekko.actor.typed.ActorRef; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import play.libs.Json; +import play.libs.ws.WSClient; +import play.libs.ws.WSRequest; +import play.libs.ws.WSResponse; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +/** + * Unit tests for SourceActor using Pekko TestKit. + * @author nattamon paiboon + */ +public class SourceActorTest { + + static final ActorTestKit testKit = ActorTestKit.create(); + + @Mock private WSClient mockWsClient; + @Mock private WSRequest mockRequest; + @Mock private WSResponse mockResponse; + @Mock private SourcesInfo mockSourcesInfo; + + @AfterAll + public static void teardown() { + testKit.shutdownTestKit(); + } + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + } + + /** + * Test successful fetching of a profile. + */ + @Test + public void testGetProfile_Success() { + // Prepare Mock API Response + ObjectNode json = Json.newObject(); + json.putArray("articles"); // Empty articles for simplicity + + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(200); + when(mockResponse.asJson()).thenReturn(json); + + // Spawn Actor + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + TestProbe probe = testKit.createTestProbe(); + + // Send Message + actor.tell(new SourceActor.GetProfile("test-id", "Test Name", mockSourcesInfo, "key", probe.getRef())); + + // Assert Response + SourceActor.ProfileLoaded response = probe.expectMessageClass(SourceActor.ProfileLoaded.class); + assertNotNull(response.source); + assertEquals("Test Name", response.source.getSourceName()); + } + + /** + * Test handling of API failure + */ + @Test + public void testGetProfile_Failure() { + // Mock Failure Response + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(400); // Error code + + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + TestProbe probe = testKit.createTestProbe(); + + actor.tell(new SourceActor.GetProfile("bad-id", "Test", mockSourcesInfo, "key", probe.getRef())); + + // Assert Error Response + SourceActor.ProfileFailed response = probe.expectMessageClass(SourceActor.ProfileFailed.class); + assertNotNull(response.error); + } + + /** + * Test that the actor caches results and doesn't call API for the same ID. + */ + @Test + public void testGetProfile_Caching() { + // Prepare Mock API Response (Success) + ObjectNode json = Json.newObject(); + json.putArray("articles"); + + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(200); + when(mockResponse.asJson()).thenReturn(json); + + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + TestProbe probe = testKit.createTestProbe(); + + actor.tell(new SourceActor.GetProfile("cache-id", "Test", null, "key", probe.getRef())); + probe.expectMessageClass(SourceActor.ProfileLoaded.class); + + actor.tell(new SourceActor.GetProfile("cache-id", "Test", null, "key", probe.getRef())); + probe.expectMessageClass(SourceActor.ProfileLoaded.class); + + verify(mockWsClient, times(1)).url(anyString()); + verify(mockRequest, times(1)).get(); + } +} \ No newline at end of file diff --git a/test/actors/SourcesInfoActorTest.java b/test/actors/SourcesInfoActorTest.java new file mode 100644 index 0000000..b10223e --- /dev/null +++ b/test/actors/SourcesInfoActorTest.java @@ -0,0 +1,135 @@ +package actors; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; +import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; +import org.apache.pekko.actor.typed.ActorRef; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import play.libs.Json; +import play.libs.ws.WSClient; +import play.libs.ws.WSRequest; +import play.libs.ws.WSResponse; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +/** + * Unit tests for SourcesInfoActor using Pekko TestKit. + * @author nattamon paiboon + */ +public class SourcesInfoActorTest { + + static final ActorTestKit testKit = ActorTestKit.create(); + + @Mock private WSClient mockWsClient; + @Mock private WSRequest mockRequest; + @Mock private WSResponse mockResponse; + + @AfterAll + public static void teardown() { + testKit.shutdownTestKit(); + } + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + } + + /** + * Test successful fetching of list of sources information. + */ + @Test + public void testGetSourcesInfo_Success() { + // Setup successful JSON response + ObjectNode json = Json.newObject(); + ObjectNode sourceNode = json.putArray("sources").addObject(); + sourceNode.put("id", "abc"); + sourceNode.put("name", "ABC News"); + sourceNode.put("description", "Desc"); + sourceNode.put("url", "url"); + sourceNode.put("category", "gen"); + sourceNode.put("language", "en"); + sourceNode.put("country", "us"); + + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(200); + when(mockResponse.asJson()).thenReturn(json); + + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + TestProbe probe = testKit.createTestProbe(); + + // Act + actor.tell(new SourcesInfoActor.GetSourcesInfo(probe.getRef())); + + // Assert + SourcesInfoActor.SourcesInfoLoaded response = probe.expectMessageClass(SourcesInfoActor.SourcesInfoLoaded.class); + assertNotNull(response.sourcesInfo); + assertEquals(1, response.sourcesInfo.getSourcesInfo().size()); + assertEquals("ABC News", response.sourcesInfo.getSourcesInfo().get(0).getName()); + } + + /** + * Test handling of API failure + */ + @Test + public void testGetSourcesInfo_Failure() { + // Setup failure + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(500); // Server error + + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + TestProbe probe = testKit.createTestProbe(); + + actor.tell(new SourcesInfoActor.GetSourcesInfo(probe.getRef())); + + SourcesInfoActor.SourcesInfoFailed response = probe.expectMessageClass(SourcesInfoActor.SourcesInfoFailed.class); + assertNotNull(response.error); + } + + /** + * Test if already have cache for the list of sources information + */ + @Test + public void testGetSourcesInfo_Caching() { + // Setup Success + ObjectNode json = Json.newObject(); + json.putArray("sources"); + + when(mockWsClient.url(anyString())).thenReturn(mockRequest); + when(mockRequest.setRequestTimeout(any())).thenReturn(mockRequest); + when(mockRequest.addQueryParameter(anyString(), anyString())).thenReturn(mockRequest); + when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); + when(mockResponse.getStatus()).thenReturn(200); + when(mockResponse.asJson()).thenReturn(json); + + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + TestProbe probe = testKit.createTestProbe(); + + // 1st Call + actor.tell(new SourcesInfoActor.GetSourcesInfo(probe.getRef())); + probe.expectMessageClass(SourcesInfoActor.SourcesInfoLoaded.class); + + // 2nd Call + actor.tell(new SourcesInfoActor.GetSourcesInfo(probe.getRef())); + probe.expectMessageClass(SourcesInfoActor.SourcesInfoLoaded.class); + + // API should be called only once + verify(mockWsClient, times(1)).url(anyString()); + verify(mockRequest, times(1)).get(); + } +} \ No newline at end of file diff --git a/test/controllers/HomeControllerTest.java b/test/controllers/HomeControllerTest.java index 9f03cd9..b075e52 100644 --- a/test/controllers/HomeControllerTest.java +++ b/test/controllers/HomeControllerTest.java @@ -122,215 +122,142 @@ public void testIndex() { assertEquals(OK, result.status()); assertEquals("text/html", result.contentType().get()); - + assertTrue(contentAsString(result).contains("NotiLytics")); } - /** - * Renders the results page when history exists in cache: expects 200 and "Search Results". - * Verifies the cache is read with the session token and that the token is present. - * @author Nattamon Paiboon - */ -// @Test -// public void testIndex_withCache() { -// ArrayList mockHistory = new ArrayList<>(); -// List
emptyArticleList = new ArrayList<>(); -// -// Search mockSearch = mock(Search.class); -// when(mockSearch.getRawQuery()).thenReturn("test query"); -// when(mockSearch.getSortBy()).thenReturn("publishedAt"); -// when(mockSearch.getResults()).thenReturn(emptyArticleList); -// -// mockHistory.add(mockSearch); -// -// // Mock the cache to return our fake history -// when(mockCache.get(anyString())) -// .thenReturn(CompletableFuture.completedFuture(Optional.of(mockHistory))); -// -// Http.RequestBuilder request = new Http.RequestBuilder() -// .method(GET) -// .uri("/"); -// -// Result result = route(app, request); -// String token = result.session().get("user_token").get(); -// -// assertEquals("Status should be OK", OK, result.status()); -// assertEquals("Content type should be HTML", "text/html", result.contentType().get()); -// assertTrue("Should render the results page", -// contentAsString(result).contains("Search Results")); -// verify(mockCache, times(1)).get(token); -// assertNotNull("Session token should be set", token); -// } + /** + * Test successfully connected to the web socket + * @throws Exception + * @author Nattamon Paiboon + */ + @Test + public void testWs() throws Exception { + // Get Controller with injected mocks + HomeController controller = app.injector().instanceOf(HomeController.class); - /** - * Performs a search with no cached history: expects 200 and "Search Results". - * Verifies the NewsAPI endpoint is called with q=test and cache is checked via the session token. - * @author Nattamon Paiboon - */ -// @Test -// public void testSearch_withEmptyCache() { -// when(mockCache.get(anyString())) -// .thenReturn(CompletableFuture.completedFuture(Optional.empty())); -// -// ObjectNode fakeArticlesJson = Json.newObject(); -// fakeArticlesJson.putArray("articles"); -// -// when(mockResponse.asJson()).thenReturn(fakeArticlesJson); -// -// Http.RequestBuilder request = new Http.RequestBuilder() -// .method(GET) -// .uri("/search?rawQuery=test&sortBy=popularity"); -// -// Result result = route(app, request); -// String token = result.session().get("user_token").get(); -// -// assertEquals(OK, result.status()); -// assertTrue(contentAsString(result).contains("Search Results")); -// -// verify(mockWsClient, times(1)).url("https://newsapi.org/v2/everything"); -// verify(mockRequest).addQueryParameter("q", "test"); -// verify(mockCache, times(1)).get(token); -// assertNotNull("Session token should be set", token); -// } + Http.Request request = new Http.RequestBuilder() + .session("user_token", "test-ws-user") + .build(); - /** - * Adds a new search when history already has 10 items: keeps list at 10 with the new search first. - * @author Nattamon Paiboon - */ -// @Test -// public void testSearch_withCache() { -// ArrayList fullHistory = new ArrayList<>(); -// List
emptyArticleList = new ArrayList<>(); -// for (int i = 0; i < 10; i++) { -// Search mockSearch = mock(Search.class); -// -// when(mockSearch.getRawQuery()).thenReturn("old_query_" + i); -// when(mockSearch.getSortBy()).thenReturn("popularity"); -// -// when(mockSearch.getResults()).thenReturn(emptyArticleList); -// -// fullHistory.add(mockSearch); -// } -// assertEquals(10, fullHistory.size()); -// -// when(mockCache.get(eq("test-token-123"))) -// .thenReturn(CompletableFuture.completedFuture(Optional.of(fullHistory))); -// -// ArgumentCaptor listCaptor = ArgumentCaptor.forClass(List.class); -// -// when(mockCache.set(eq("test-token-123"), listCaptor.capture(), eq(3600))) -// .thenReturn(CompletableFuture.completedFuture(null)); -// -// ObjectNode fakeArticlesJson = Json.newObject(); -// fakeArticlesJson.putArray("articles"); -// when(mockResponse.asJson()).thenReturn(fakeArticlesJson); -// -// Http.RequestBuilder request = new Http.RequestBuilder() -// .method(GET) -// .uri("/search?rawQuery=new_search&sortBy=popularity") -// .session("user_token", "test-token-123"); -// -// Result result = route(app, request); -// -// // --- Assert --- -// assertEquals(OK, result.status()); -// verify(mockCache, times(1)).set(eq("test-token-123"), any(List.class), eq(3600)); -// List savedList = (List) listCaptor.getValue(); -// assertEquals("History size should be capped at 10", 10, savedList.size()); -// assertEquals("The new search should be first", "new_search", -// savedList.getFirst().getRawQuery()); -// assertEquals("The second-oldest item should now be last", "old_query_8", -// savedList.getLast().getRawQuery()); -// } + play.mvc.WebSocket ws = controller.ws(); - /** - * Fetches sources and articles when neither sourcesInfo nor the source page is cached: expects 200. - * @author Nattamon Paiboon - */ + // Trigger the handshake + var future = ws.apply(request).toCompletableFuture(); + + var result = future.get(5, java.util.concurrent.TimeUnit.SECONDS); + + // Verify we got a Flow + assertTrue("WebSocket handshake should return Right(Flow)", result.right.isPresent()); + org.junit.Assert.assertNotNull(result.right.get()); + } + + /** + * Test Success Scenario: + * - SourcesInfo API returns valid source data. + * - Article API returns valid articles. + * - Controller should render the view (HTTP 200). + * @author Nattamon Paiboon + */ @Test - public void testSource_withoutCache() { - // Mock cache "sourcesInfo" - when(mockCache.get(eq("sourcesInfo"))) - .thenReturn(CompletableFuture.completedFuture(Optional.empty())); - when(mockCache.set(eq("sourcesInfo"), any(SourcesInfo.class))) - .thenReturn(CompletableFuture.completedFuture(null)); - - // Mock cache test source ("source_test") - when(mockCache.get(eq("source_test"))) - .thenReturn(CompletableFuture.completedFuture(Optional.empty())); - when(mockCache.set(eq("source_test"), any(Source.class), eq(3600))) - .thenReturn(CompletableFuture.completedFuture(null)); - - WSRequest mockSourcesRequest = mock(WSRequest.class); - WSResponse mockSourcesResponse = mock(WSResponse.class); - ObjectNode fakeSourcesJson = Json.newObject(); - fakeSourcesJson.putArray("sources"); - when(mockWsClient.url(eq("https://newsapi.org/v2/top-headlines/sources"))).thenReturn(mockSourcesRequest); - when(mockSourcesRequest.setRequestTimeout(any(Duration.class))).thenReturn(mockSourcesRequest); - when(mockSourcesRequest.addQueryParameter(eq("apiKey"), anyString())).thenReturn(mockSourcesRequest); - when(mockSourcesRequest.get()).thenReturn(CompletableFuture.completedFuture(mockSourcesResponse)); - when(mockSourcesResponse.getStatus()).thenReturn(200); - when(mockSourcesResponse.asJson()).thenReturn(fakeSourcesJson); - - ObjectNode fakeArticlesJson = Json.newObject(); - fakeArticlesJson.putArray("articles"); - when(mockWsClient.url(eq("https://newsapi.org/v2/everything"))).thenReturn(mockRequest); - when(mockResponse.asJson()).thenReturn(fakeArticlesJson); - when(mockResponse.getStatus()).thenReturn(200); + public void testSource_Success() { + // JSON for SourcesInfoActor (list of sources) + ObjectNode sourcesJson = Json.newObject(); + var sourcesArray = sourcesJson.putArray("sources"); + var sourceObj = sourcesArray.addObject(); + sourceObj.put("id", "cnn"); + sourceObj.put("name", "CNN"); + sourceObj.put("description", "CNN News"); + sourceObj.put("url", "http://cnn.com"); + sourceObj.put("category", "general"); + sourceObj.put("language", "en"); + sourceObj.put("country", "us"); + + + // JSON for SourceActor (list of articles) + ObjectNode articlesJson = Json.newObject(); + var articlesArray = articlesJson.putArray("articles"); + var articleObj = articlesArray.addObject(); + articleObj.put("title", "Mock article"); + articleObj.put("url", "http://cnn.com/mock"); + articleObj.put("publishedAt", "2025-01-01T12:00:00Z"); + articleObj.put("content", "mock"); + articleObj.put("description", "mock"); + var articleSource = articleObj.putObject("source"); + articleSource.put("id", "cnn"); + articleSource.put("name", "CNN"); + + // Mock request for Sources List + WSRequest sourcesRequest = mock(WSRequest.class); + WSResponse sourcesResponse = mock(WSResponse.class); + when(sourcesResponse.getStatus()).thenReturn(200); + when(sourcesResponse.asJson()).thenReturn(sourcesJson); + when(sourcesRequest.get()).thenReturn(CompletableFuture.completedFuture(sourcesResponse)); + when(sourcesRequest.setRequestTimeout(any())).thenReturn(sourcesRequest); + when(sourcesRequest.addQueryParameter(anyString(), anyString())).thenReturn(sourcesRequest); + + // Mock request for Articles + WSRequest articlesRequest = mock(WSRequest.class); + WSResponse articlesResponse = mock(WSResponse.class); + when(articlesResponse.getStatus()).thenReturn(200); + when(articlesResponse.asJson()).thenReturn(articlesJson); + when(articlesRequest.get()).thenReturn(CompletableFuture.completedFuture(articlesResponse)); + when(articlesRequest.setRequestTimeout(any())).thenReturn(articlesRequest); + when(articlesRequest.addQueryParameter(anyString(), anyString())).thenReturn(articlesRequest); + + // Bind URLs to requests + when(mockWsClient.url("https://newsapi.org/v2/top-headlines/sources")).thenReturn(sourcesRequest); + when(mockWsClient.url("https://newsapi.org/v2/everything")).thenReturn(articlesRequest); Http.RequestBuilder request = new Http.RequestBuilder() .method(GET) - .uri("/source/Test?identifier=test"); + .uri("/source/cnn?identifier=cnn"); Result result = route(app, request); - // --- Assert --- - assertEquals("Status should be OK", OK, result.status()); - - // Verify sourcesInfo in source method - verify(mockCache, times(1)).get("sourcesInfo"); - verify(mockCache, times(1)).set(eq("sourcesInfo"), any(SourcesInfo.class)); - verify(mockWsClient, times(1)).url(eq("https://newsapi.org/v2/top-headlines/sources")); - verify(mockSourcesRequest, times(1)).get(); + assertEquals(OK, result.status()); + String content = contentAsString(result); - // Verify fetchSourcePage method - verify(mockCache, times(1)).get("source_test"); - verify(mockWsClient, times(1)).url(eq("https://newsapi.org/v2/everything")); - verify(mockCache, times(1)).set(eq("source_test"), any(Source.class), eq(3600)); + // Check if the HTML rendered the data from the mock + assertTrue("Should display source name", content.contains("CNN")); + assertTrue("Should display article title", content.contains("Mock article")); } + /** - * Tests the use of cached sourcesInfo and source data: expects 200 with no external API calls. + * Test Failure Scenario: + * - SourceInfo fail & Article api fail + * - SourceActor should reply with ProfileFailed. + * - Controller should return Internal Server Error (500). * @author Nattamon Paiboon */ @Test - public void testSource_withCache() { - // Mock "sourcesInfo" - SourcesInfo mockSourcesInfo = mock(SourcesInfo.class); - when(mockCache.get(eq("sourcesInfo"))) - .thenReturn(CompletableFuture.completedFuture(Optional.of(mockSourcesInfo))); - - // mock "source_test" - Source mockCachedSource = mock(Source.class); - when(mockCache.get(eq("source_test"))) - .thenReturn(CompletableFuture.completedFuture(Optional.of(mockCachedSource))); + public void testSource_Failure_ApiError() { + // Mock Sources Info (Failure) + WSRequest sourcesRequest = mock(WSRequest.class); + WSResponse sourcesResponse = mock(WSResponse.class); + when(sourcesResponse.getStatus()).thenReturn(500); + when(sourcesRequest.get()).thenReturn(CompletableFuture.completedFuture(sourcesResponse)); + when(sourcesRequest.setRequestTimeout(any())).thenReturn(sourcesRequest); + when(sourcesRequest.addQueryParameter(anyString(), anyString())).thenReturn(sourcesRequest); + + // Mock Article Response (FAILURE) + WSRequest articlesRequest = mock(WSRequest.class); + WSResponse articlesResponse = mock(WSResponse.class); + when(articlesResponse.getStatus()).thenReturn(500); // Internal Error + when(articlesRequest.get()).thenReturn(CompletableFuture.completedFuture(articlesResponse)); + when(articlesRequest.setRequestTimeout(any())).thenReturn(articlesRequest); + when(articlesRequest.addQueryParameter(anyString(), anyString())).thenReturn(articlesRequest); + + when(mockWsClient.url("https://newsapi.org/v2/top-headlines/sources")).thenReturn(sourcesRequest); + when(mockWsClient.url("https://newsapi.org/v2/everything")).thenReturn(articlesRequest); Http.RequestBuilder request = new Http.RequestBuilder() .method(GET) - .uri("/source/Test?identifier=test"); + .uri("/source/cnn?identifier=cnn"); Result result = route(app, request); - // --- Assert --- - assertEquals("Status should be OK", OK, result.status()); - verify(mockCache, times(1)).get("sourcesInfo"); - verify(mockCache, times(1)).get("source_test"); - - // Verify NO API calls were made - verify(mockWsClient, never()).url(anyString()); - - // Verify NO cache sets were made - verify(mockCache, never()).set(anyString(), any()); - verify(mockCache, never()).set(anyString(), any(), anyInt()); + assertEquals(INTERNAL_SERVER_ERROR, result.status()); } /** From 2fe0431f1d3d7d36e07782da1485ebc739bba41e Mon Sep 17 00:00:00 2001 From: nattamon Date: Sat, 29 Nov 2025 14:49:15 -0500 Subject: [PATCH 10/18] EDIT move create source/sourceInfo actor to supervisor instead of controller --- app/actors/SourceActor.java | 14 +-- app/actors/SourcesInfoActor.java | 14 +-- app/actors/SupervisorActor.java | 151 +++++++++++--------------- app/controllers/HomeController.java | 27 +++-- build.sbt | 8 +- test/actors/SourceActorTest.java | 7 +- test/actors/SourcesInfoActorTest.java | 6 +- 7 files changed, 109 insertions(+), 118 deletions(-) diff --git a/app/actors/SourceActor.java b/app/actors/SourceActor.java index 53361b1..32cbd16 100644 --- a/app/actors/SourceActor.java +++ b/app/actors/SourceActor.java @@ -26,19 +26,19 @@ * * @author Nattamon Paiboon */ -public class SourceActor extends AbstractBehavior { +public class SourceActor extends AbstractBehavior { /** HTTP client used to call external APIs (e.g., NewsAPI) and fetch JSON responses. */ private final WSClient wsClient; /** Cache to store map of source profile */ private final Map cache = new HashMap<>(); /** Message interface */ - public interface Command {} + public interface Message {} /** * Message to request a profile for a specific news source. */ - public static class GetProfile implements Command { + public static class GetProfile implements Message { /** The ID or domain to search */ public final String identifier; /** The display name of the source. */ @@ -78,23 +78,23 @@ public ProfileFailed(String error) { } } - public static Behavior create(WSClient wsClient) { + public static Behavior create(WSClient wsClient) { return Behaviors.setup(context -> new SourceActor(context, wsClient)); } - private SourceActor(ActorContext context, WSClient wsClient) { + private SourceActor(ActorContext context, WSClient wsClient) { super(context); this.wsClient = wsClient; } @Override - public Receive createReceive() { + public Receive createReceive() { return newReceiveBuilder() .onMessage(GetProfile.class, this::onGetProfile) .build(); } - private Behavior onGetProfile(GetProfile msg) { + private Behavior onGetProfile(GetProfile msg) { // check cachr for source article first if (cache.containsKey(msg.identifier)) { getContext().getLog().info("Returning cached profile for: {}", msg.identifier); diff --git a/app/actors/SourcesInfoActor.java b/app/actors/SourcesInfoActor.java index 815214e..501f3c5 100644 --- a/app/actors/SourcesInfoActor.java +++ b/app/actors/SourcesInfoActor.java @@ -18,7 +18,7 @@ * once the list is successfully fetched for the first time, it stored in this cachedInfo. * @author Nattamon Paiboon */ -public class SourcesInfoActor extends AbstractBehavior { +public class SourcesInfoActor extends AbstractBehavior { /** HTTP client used to call external APIs (e.g., NewsAPI) and fetch JSON responses. */ private final WSClient wsClient; /** API key read from configuration and attached to outbound API requests. */ @@ -27,13 +27,13 @@ public class SourcesInfoActor extends AbstractBehavior private SourcesInfo cachedInfo = null; /** Message interface */ - public interface Command {} + public interface Message {} /** * Message to request the list of all news sources. * The actor will reply with either SourcesInfoLoaded or SourcesInfoFailed. */ - public static class GetSourcesInfo implements Command { + public static class GetSourcesInfo implements Message { public final ActorRef replyTo; public GetSourcesInfo(ActorRef replyTo) { @@ -64,24 +64,24 @@ public SourcesInfoFailed(String error) { } } - public static Behavior create(WSClient wsClient, String apiKey) { + public static Behavior create(WSClient wsClient, String apiKey) { return Behaviors.setup(context -> new SourcesInfoActor(context, wsClient, apiKey)); } - private SourcesInfoActor(ActorContext context, WSClient wsClient, String apiKey) { + private SourcesInfoActor(ActorContext context, WSClient wsClient, String apiKey) { super(context); this.wsClient = wsClient; this.apiKey = apiKey; } @Override - public Receive createReceive() { + public Receive createReceive() { return newReceiveBuilder() .onMessage(GetSourcesInfo.class, this::onGetSourcesInfo) .build(); } - private Behavior onGetSourcesInfo(GetSourcesInfo msg) { + private Behavior onGetSourcesInfo(GetSourcesInfo msg) { // check cache first if (this.cachedInfo != null) { getContext().getLog().info("Returning cached SourcesInfo"); diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index 361d01b..122ba24 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -3,108 +3,85 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; -import org.apache.pekko.actor.typed.SupervisorStrategy; -import org.apache.pekko.actor.typed.Terminated; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; - /** - * Parent actor that manages the creation of individual actors - * for each WebSocket connection / logical user. - * - * This actor is responsible for: - * - Creating at most one UserActor per logical user id - * - Supervising children and restarting them on failures (e.g., News API errors) - * - Returning the Flow that is attached - * to the Play WebSocket for a given user - * - * @author Siming Yi + * Parent actor that manages the creation of individual UserActors for each WebSocket connection. */ public final class SupervisorActor { + private SupervisorActor() {} - private SupervisorActor() { - // static utility – no instances - } - - /** - * Message used by the controller to request (or reuse) a {@link UserActor} - * and obtain the WebSocket {@link Flow}. - */ - public static final class Create { - /** Stable identifier for the logical user / session (e.g., session token). */ - public final String id; + public interface Message {} - /** Actor that will receive the materialized WebSocket flow. */ - public final ActorRef> replyTo; + // Message to request the creation of a UserActor + public static final class Create implements Message { + final String id; + final ActorRef> replyTo; - public Create(String id, ActorRef> replyTo) { - this.id = id; - this.replyTo = replyTo; - } + public Create(String id, ActorRef> replyTo) { + this.id = id; + this.replyTo = replyTo; } + } - /** - * Factory method for the supervisor behavior. - * - * @param wsClient WS client used by child actors when calling the News API - * @param apiKey API key for the News API - * @param asyncCache Cache used by child actors to persist user search history - * @return behavior handling {@link Create} messages - */ - public static Behavior create(WSClient wsClient, - String apiKey, - AsyncCacheApi asyncCache) { - - return Behaviors.setup(context -> { - // one UserActor per logical user id - Map> children = new HashMap<>(); - - return Behaviors - .receive(SupervisorActor.Create.class) - .onMessage(SupervisorActor.Create.class, msg -> { - ActorRef child = children.get(msg.id); - if (child == null) { - Behavior childBehavior = - Behaviors.supervise( - UserActor.create(msg.id, wsClient, apiKey, asyncCache) - ).onFailure( - Exception.class, - SupervisorStrategy - .restart() - .withLimit(3, Duration.ofMinutes(1)) - ); - - child = context.spawn(childBehavior, "userActor-" + msg.id); - context.watch(child); - children.put(msg.id, child); - context.getLog().info("Created UserActor for id={}", msg.id); - } else { - context.getLog().info("Reusing existing UserActor for id={}", msg.id); - } + /** + * For get other additional actors which will be used in controller ( sourceActor and sourceInfoActor ) + */ + public static final class GetOtherActors implements Message { + public final ActorRef replyTo; + public GetOtherActors(ActorRef replyTo) { + this.replyTo = replyTo; + } + } - child.tell(new UserActor.Connect(msg.replyTo)); - return Behaviors.same(); - }) - .onSignal(Terminated.class, sig -> { - ActorRef ref = sig.getRef(); + /** + * Store other additional actors like SourceActor and SourceInfoActor + */ + public static final class OtherActors { + public final ActorRef sourceActor; + public final ActorRef sourcesInfoActor; - children.values().removeIf(child -> child.equals(ref)); + public OtherActors(ActorRef sourceActor, ActorRef sourcesInfoActor) { + this.sourceActor = sourceActor; + this.sourcesInfoActor = sourcesInfoActor; + } + } - context.getLog().info( - "Child actor {} terminated and was removed from supervisor registry.", - ref.path() - ); + // TODO rm cache + public static Behavior create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { + return Behaviors.setup(context -> { + // create source actor + ActorRef sourceActor = context.spawn( + SourceActor.create(wsClient), + "sourceActor" + ); + // create sourceInfo actor + ActorRef sourcesInfoActor = context.spawn( + SourcesInfoActor.create(wsClient, apiKey), + "sourcesInfoActor" + ); + context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); - return Behaviors.same(); - }) - .build(); - }); - } -} + return Behaviors.receive(Message.class) + .onMessage(Create.class, create -> { + // Spawn a new UserActor for this specific connection ID + ActorRef child = context.spawn( + UserActor.create(create.id, wsClient, apiKey, asyncCache), + "userActor-" + create.id + ); + // Ask the child to establish the stream + child.tell(new UserActor.Connect(create.replyTo)); + return Behaviors.same(); + }) + .onMessage(GetOtherActors.class, msg -> { + msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); + return Behaviors.same(); + }) + .build(); + }); + } +} \ No newline at end of file diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index eafdc80..9d1249a 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -45,12 +45,12 @@ public class HomeController extends Controller { private final AsyncCacheApi asyncCache; private final ActorSystem actorSystem; - private final ActorRef userParentActor; + private final ActorRef supervisorActor; /** Actor of the source profile */ - private final ActorRef sourceActor; + private final ActorRef sourceActor; /** Actor of the sources information */ - private final ActorRef sourcesInfoActor; + private final ActorRef sourcesInfoActor; /** * Creates a HomeController with the required web client, caching system, and API key. @@ -68,11 +68,20 @@ public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config, Acto this.actorSystem = actorSystem; // TODO rm cache - this.userParentActor = actorSystem.systemActorOf( - SupervisorActor.create(ws, apiKey, asyncCache), "UserParentActor", Props.empty() + this.supervisorActor = actorSystem.systemActorOf( + SupervisorActor.create(ws, apiKey, asyncCache), "SupervisorActor", Props.empty() ); - this.sourceActor = actorSystem.systemActorOf(SourceActor.create(ws), "sourceActor", Props.empty()); - this.sourcesInfoActor = actorSystem.systemActorOf(SourcesInfoActor.create(ws, apiKey), "sourcesInfoActor", Props.empty()); + + CompletionStage futureActors = AskPattern.ask( + supervisorActor, + SupervisorActor.GetOtherActors::new, + Duration.ofSeconds(5), + actorSystem.scheduler() + ); + + SupervisorActor.OtherActors actors = futureActors.toCompletableFuture().join(); + this.sourceActor = actors.sourceActor; + this.sourcesInfoActor = actors.sourcesInfoActor; } /** @@ -93,9 +102,9 @@ public WebSocket ws() { Scheduler scheduler = actorSystem.scheduler(); - // Ask the UserParentActor to create a flow for this user + // Ask the SupervisorActor to create a flow for this user CompletionStage> future = AskPattern.ask( - userParentActor, + supervisorActor, (ActorRef> replyTo) -> new SupervisorActor.Create(id, replyTo), Duration.ofSeconds(10), scheduler diff --git a/build.sbt b/build.sbt index d5d2b3c..908de27 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,13 @@ libraryDependencies ++= Seq( "org.junit.platform" % "junit-platform-launcher" % "1.10.2" % Test, "io.whelk.flesch.kincaid" % "whelk-flesch-kincaid" % "0.1.11", "org.mockito" % "mockito-core" % "5.11.0" % Test, - "org.apache.pekko" %% "pekko-actor-testkit-typed" % "1.3.0" % Test, + // ---------- Apache Pekko ---------- + "org.apache.pekko" %% "pekko-actor-typed" % "1.0.3", + "org.apache.pekko" %% "pekko-stream" % "1.0.3", + "org.apache.pekko" %% "pekko-actor-testkit-typed" % "1.0.3" % Test, + "org.apache.pekko" %% "pekko-stream-testkit" % "1.0.3" % Test, + "org.assertj" % "assertj-core" % "3.27.3" % Test, + "org.awaitility" % "awaitility" % "4.3.0" % Test, ) diff --git a/test/actors/SourceActorTest.java b/test/actors/SourceActorTest.java index f2aea36..b4e0dd6 100644 --- a/test/actors/SourceActorTest.java +++ b/test/actors/SourceActorTest.java @@ -15,7 +15,6 @@ import play.libs.ws.WSRequest; import play.libs.ws.WSResponse; -import java.time.Duration; import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -64,7 +63,7 @@ public void testGetProfile_Success() { when(mockResponse.asJson()).thenReturn(json); // Spawn Actor - ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); TestProbe probe = testKit.createTestProbe(); // Send Message @@ -88,7 +87,7 @@ public void testGetProfile_Failure() { when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); when(mockResponse.getStatus()).thenReturn(400); // Error code - ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); TestProbe probe = testKit.createTestProbe(); actor.tell(new SourceActor.GetProfile("bad-id", "Test", mockSourcesInfo, "key", probe.getRef())); @@ -114,7 +113,7 @@ public void testGetProfile_Caching() { when(mockResponse.getStatus()).thenReturn(200); when(mockResponse.asJson()).thenReturn(json); - ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); + ActorRef actor = testKit.spawn(SourceActor.create(mockWsClient)); TestProbe probe = testKit.createTestProbe(); actor.tell(new SourceActor.GetProfile("cache-id", "Test", null, "key", probe.getRef())); diff --git a/test/actors/SourcesInfoActorTest.java b/test/actors/SourcesInfoActorTest.java index b10223e..c1c47ed 100644 --- a/test/actors/SourcesInfoActorTest.java +++ b/test/actors/SourcesInfoActorTest.java @@ -67,7 +67,7 @@ public void testGetSourcesInfo_Success() { when(mockResponse.getStatus()).thenReturn(200); when(mockResponse.asJson()).thenReturn(json); - ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); TestProbe probe = testKit.createTestProbe(); // Act @@ -92,7 +92,7 @@ public void testGetSourcesInfo_Failure() { when(mockRequest.get()).thenReturn(CompletableFuture.completedFuture(mockResponse)); when(mockResponse.getStatus()).thenReturn(500); // Server error - ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); TestProbe probe = testKit.createTestProbe(); actor.tell(new SourcesInfoActor.GetSourcesInfo(probe.getRef())); @@ -117,7 +117,7 @@ public void testGetSourcesInfo_Caching() { when(mockResponse.getStatus()).thenReturn(200); when(mockResponse.asJson()).thenReturn(json); - ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); + ActorRef actor = testKit.spawn(SourcesInfoActor.create(mockWsClient, "key")); TestProbe probe = testKit.createTestProbe(); // 1st Call From 2a1fd8d9a45259cb90d11a11e1240e304f133370 Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Sat, 29 Nov 2025 16:11:56 -0500 Subject: [PATCH 11/18] EDIT supervisor actor --- app/actors/SupervisorActor.java | 78 ++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 16 deletions(-) diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index 122ba24..f508c39 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -3,14 +3,25 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.SupervisorStrategy; +import org.apache.pekko.actor.typed.Terminated; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; /** * Parent actor that manages the creation of individual UserActors for each WebSocket connection. + * This actor is responsible for: + * - Creating at most one UserActor per logical user id + * - Supervising children and restarting them on failures (e.g., News API errors) + * - Returning the Flow that is attached + * to the Play WebSocket for a given user + * - Spawning shared SourceActor / SourcesInfoActor that are reused by the controller */ public final class SupervisorActor { private SupervisorActor() {} @@ -66,22 +77,57 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa ); context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); - return Behaviors.receive(Message.class) - .onMessage(Create.class, create -> { - // Spawn a new UserActor for this specific connection ID - ActorRef child = context.spawn( - UserActor.create(create.id, wsClient, apiKey, asyncCache), - "userActor-" + create.id - ); - // Ask the child to establish the stream - child.tell(new UserActor.Connect(create.replyTo)); - return Behaviors.same(); - }) - .onMessage(GetOtherActors.class, msg -> { - msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); - return Behaviors.same(); - }) - .build(); + Map> children = new HashMap<>(); + + + return Behaviors + .receive(Message.class) + .onMessage(Create.class, msg -> { + // Reuse existing UserActor for this id if present + ActorRef child = children.get(msg.id); + if (child == null) { + // Wrap child in a supervision strategy (restart on failure) + Behavior childBehavior = + Behaviors.supervise( + UserActor.create(msg.id, wsClient, apiKey, asyncCache) + ).onFailure( + Exception.class, + SupervisorStrategy + .restart() + .withLimit(3, Duration.ofMinutes(1)) + ); + + child = context.spawn(childBehavior, "userActor-" + msg.id); + context.watch(child); + children.put(msg.id, child); + + context.getLog().info("Created UserActor for id={}", msg.id); + } else { + context.getLog().info("Reusing existing UserActor for id={}", msg.id); + } + + // Ask the child to establish the WebSocket stream + child.tell(new UserActor.Connect(msg.replyTo)); + return Behaviors.same(); + }) + .onMessage(GetOtherActors.class, msg -> { + msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); + return Behaviors.same(); + }) + .onSignal(Terminated.class, sig -> { + ActorRef terminatedRef = sig.getRef(); + + // Remove terminated child from registry, if present + children.values().removeIf(child -> child.equals(terminatedRef)); + + context.getLog().info( + "Child actor {} terminated and was removed from supervisor registry.", + terminatedRef.path() + ); + + return Behaviors.same(); + }) + .build(); }); } } \ No newline at end of file From dc710ad20be8f5c909f28cb5e3a5a898ff0c26d9 Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Sat, 29 Nov 2025 16:41:52 -0500 Subject: [PATCH 12/18] SOLVE conflicts --- app/actors/SupervisorActor.java | 205 ++++++++++---------- app/actors/UserActor.java | 327 ++++++++++++++++---------------- 2 files changed, 272 insertions(+), 260 deletions(-) diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index f508c39..b7d1bea 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; + import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -24,110 +25,112 @@ * - Spawning shared SourceActor / SourcesInfoActor that are reused by the controller */ public final class SupervisorActor { - private SupervisorActor() {} + private SupervisorActor() {} + + public interface Message {} + + // Message to request the creation of a UserActor + public static final class Create implements Message { + final String id; + final ActorRef> replyTo; - public interface Message {} + public Create(String id, ActorRef> replyTo) { + this.id = id; + this.replyTo = replyTo; + } + } - // Message to request the creation of a UserActor - public static final class Create implements Message { - final String id; - final ActorRef> replyTo; + /** + * For get other additional actors which will be used in controller ( sourceActor and sourceInfoActor ) + */ + public static final class GetOtherActors implements Message { + public final ActorRef replyTo; - public Create(String id, ActorRef> replyTo) { - this.id = id; - this.replyTo = replyTo; + public GetOtherActors(ActorRef replyTo) { + this.replyTo = replyTo; + } } - } - - /** - * For get other additional actors which will be used in controller ( sourceActor and sourceInfoActor ) - */ - public static final class GetOtherActors implements Message { - public final ActorRef replyTo; - public GetOtherActors(ActorRef replyTo) { - this.replyTo = replyTo; + + /** + * Store other additional actors like SourceActor and SourceInfoActor + */ + public static final class OtherActors { + public final ActorRef sourceActor; + public final ActorRef sourcesInfoActor; + + public OtherActors(ActorRef sourceActor, + ActorRef sourcesInfoActor) { + this.sourceActor = sourceActor; + this.sourcesInfoActor = sourcesInfoActor; + } } - } - - /** - * Store other additional actors like SourceActor and SourceInfoActor - */ - public static final class OtherActors { - public final ActorRef sourceActor; - public final ActorRef sourcesInfoActor; - - public OtherActors(ActorRef sourceActor, ActorRef sourcesInfoActor) { - this.sourceActor = sourceActor; - this.sourcesInfoActor = sourcesInfoActor; + + // TODO rm cache + public static Behavior create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { + return Behaviors.setup(context -> { + // create source actor + ActorRef sourceActor = context.spawn( + SourceActor.create(wsClient), + "sourceActor" + ); + // create sourceInfo actor + ActorRef sourcesInfoActor = context.spawn( + SourcesInfoActor.create(wsClient, apiKey), + "sourcesInfoActor" + ); + context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); + + // one UserActor per logical user id + Map> children = new HashMap<>(); + + return Behaviors + .receive(Message.class) + .onMessage(Create.class, msg -> { + // Reuse existing UserActor for this id if present + ActorRef child = children.get(msg.id); + if (child == null) { + // Wrap child in a supervision strategy (restart on failure) + Behavior childBehavior = + Behaviors.supervise( + UserActor.create(msg.id, wsClient, apiKey, asyncCache) + ).onFailure( + Exception.class, + SupervisorStrategy + .restart() + .withLimit(3, Duration.ofMinutes(1)) + ); + + child = context.spawn(childBehavior, "userActor-" + msg.id); + context.watch(child); + children.put(msg.id, child); + + context.getLog().info("Created UserActor for id={}", msg.id); + } else { + context.getLog().info("Reusing existing UserActor for id={}", msg.id); + } + + // Ask the child to establish the WebSocket stream + child.tell(new UserActor.Connect(msg.replyTo)); + return Behaviors.same(); + }) + .onMessage(GetOtherActors.class, msg -> { + msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); + return Behaviors.same(); + }) + .onSignal(Terminated.class, sig -> { + ActorRef terminatedRef = sig.getRef(); + + // Remove terminated child from registry, if present + children.values().removeIf(child -> child.equals(terminatedRef)); + + context.getLog().info( + "Child actor {} terminated and was removed from supervisor registry.", + terminatedRef.path() + ); + + return Behaviors.same(); + }) + .build(); + }); } - } - - // TODO rm cache - public static Behavior create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { - return Behaviors.setup(context -> { - // create source actor - ActorRef sourceActor = context.spawn( - SourceActor.create(wsClient), - "sourceActor" - ); - // create sourceInfo actor - ActorRef sourcesInfoActor = context.spawn( - SourcesInfoActor.create(wsClient, apiKey), - "sourcesInfoActor" - ); - context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); - - Map> children = new HashMap<>(); - - - return Behaviors - .receive(Message.class) - .onMessage(Create.class, msg -> { - // Reuse existing UserActor for this id if present - ActorRef child = children.get(msg.id); - if (child == null) { - // Wrap child in a supervision strategy (restart on failure) - Behavior childBehavior = - Behaviors.supervise( - UserActor.create(msg.id, wsClient, apiKey, asyncCache) - ).onFailure( - Exception.class, - SupervisorStrategy - .restart() - .withLimit(3, Duration.ofMinutes(1)) - ); - - child = context.spawn(childBehavior, "userActor-" + msg.id); - context.watch(child); - children.put(msg.id, child); - - context.getLog().info("Created UserActor for id={}", msg.id); - } else { - context.getLog().info("Reusing existing UserActor for id={}", msg.id); - } - - // Ask the child to establish the WebSocket stream - child.tell(new UserActor.Connect(msg.replyTo)); - return Behaviors.same(); - }) - .onMessage(GetOtherActors.class, msg -> { - msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); - return Behaviors.same(); - }) - .onSignal(Terminated.class, sig -> { - ActorRef terminatedRef = sig.getRef(); - - // Remove terminated child from registry, if present - children.values().removeIf(child -> child.equals(terminatedRef)); - - context.getLog().info( - "Child actor {} terminated and was removed from supervisor registry.", - terminatedRef.path() - ); - - return Behaviors.same(); - }) - .build(); - }); - } -} \ No newline at end of file +} diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index 48e16e1..fd43dce 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -7,9 +7,14 @@ import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.AskPattern; import org.apache.pekko.actor.typed.javadsl.Behaviors; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.BroadcastHub; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.MergeHub; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.Json; @@ -21,166 +26,170 @@ import java.util.List; import java.util.concurrent.CompletionStage; +/** + * Per-user actor that owns the WebSocket stream and the search history. + */ public class UserActor { - // --- Messages --- - public interface Message {} + // --- Messages --- + public interface Message {} - public static final class Connect implements Message { - final ActorRef> replyTo; - public Connect(ActorRef> replyTo) { - this.replyTo = replyTo; - } - } - - public static final class PerformSearch implements Message { - public final String query; - public final String sortBy; - public PerformSearch(String query, String sortBy) { - this.query = query; - this.sortBy = sortBy; + public static final class Connect implements Message { + final ActorRef> replyTo; + public Connect(ActorRef> replyTo) { + this.replyTo = replyTo; + } } - } - - // Internal message to stop the actor safely - private static final class InternalStop implements Message {} - - // --- State --- - private final String id; - private final WSClient ws; - private final String apiKey; - private final ActorContext context; - private final List searchHistory = new ArrayList<>(); - - /** Child actor that performs sentiment analysis for this user. */ - private final ActorRef sentimentActor; - - private final Sink hubSink; - private final Flow websocketFlow; - - // TODO rm cache - private final AsyncCacheApi asyncCache; - // --- Factory --- - // TODO rm cache - public static Behavior create(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache) { - return Behaviors.setup(context -> new UserActor(id, ws, apiKey, asyncCache, context).behavior()); - } - - // --- Constructor --- - // TODO rm cache - private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorContext context) { - this.id = id; - this.ws = ws; - this.apiKey = apiKey; - this.context = context; - this.asyncCache = asyncCache; - - this.sentimentActor = context.spawn( - SentimentAnalyzerActor.create(), - "SentimentAnalyzer-" + id - ); - - Materializer mat = Materializer.matFromSystem(context.getSystem()); - - ActorRef self = context.getSelf(); - - Pair, Source> sinkSourcePair = - MergeHub.of(JsonNode.class, 16) - .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) - .run(mat); - - this.hubSink = sinkSourcePair.first(); - Source hubSource = sinkSourcePair.second(); - - // Handle incoming messages from the WebSocket - Sink> jsonSink = Sink.foreach((JsonNode json) -> { - if (json.has("query")) { - String query = json.get("query").asText(); - String sortBy = "popularity"; - if (json.has("sortBy")) { - sortBy = json.get("sortBy").asText(); + + public static final class PerformSearch implements Message { + public final String query; + public final String sortBy; + public PerformSearch(String query, String sortBy) { + this.query = query; + this.sortBy = sortBy; } - self.tell(new PerformSearch(query, sortBy)); - } - }); - - // ping websocket to keep it alive - Source heartbeat = Source.tick( - Duration.ZERO, - Duration.ofSeconds(30), - (JsonNode) Json.newObject().put("type", "ping") - ).mapMaterializedValue(c -> NotUsed.getInstance()); - - this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) - .watchTermination((n, stage) -> { - stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); - return NotUsed.getInstance(); - }); - } - - // --- Behavior --- - private Behavior behavior() { - return Behaviors.receive(Message.class) - .onMessage(Connect.class, msg -> { - context.getLog().info("Client connected: {}", id); - msg.replyTo.tell(websocketFlow); - return Behaviors.same(); - }) - .onMessage(PerformSearch.class, msg -> { - context.getLog().info("User {} searching for: {}", id, msg.query); - - Search search = new Search(msg.query, msg.sortBy, apiKey); - - // call News API (as before) - search.fetchResults(ws) - // after results arrive, ask the SentimentAnalyzerActor - .thenCompose(v -> { - List texts = search.getResults().stream() - .limit(50) - .map(a -> a.getDescription() != null ? a.getDescription() : "") - .toList(); - - // empty -> neutral - if (texts.isEmpty()) { - search.setSentiment(":-|"); - return java.util.concurrent.CompletableFuture.completedFuture(null); - } - - // use ask pattern for sentiment analyze - return AskPattern.ask( - sentimentActor, - (ActorRef replyTo) -> - new SentimentAnalyzerActor.AnalyzeAverage(texts, replyTo), - Duration.ofSeconds(2), - context.getSystem().scheduler() - ).thenAccept(result -> { - search.setSentiment(result.sentiment); + } + + // Internal message to stop the actor safely + private static final class InternalStop implements Message {} + + // --- State --- + private final String id; + private final WSClient ws; + private final String apiKey; + private final ActorContext context; + private final List searchHistory = new ArrayList<>(); + + /** Child actor that performs sentiment analysis for this user. */ + private final ActorRef sentimentActor; + + private final Sink hubSink; + private final Flow websocketFlow; + + // TODO rm cache + private final AsyncCacheApi asyncCache; + + // --- Factory --- + // TODO rm cache + public static Behavior create(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache) { + return Behaviors.setup(context -> new UserActor(id, ws, apiKey, asyncCache, context).behavior()); + } + + // --- Constructor --- + // TODO rm cache + private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorContext context) { + this.id = id; + this.ws = ws; + this.apiKey = apiKey; + this.context = context; + this.asyncCache = asyncCache; + + // child sentiment analyzer actor + this.sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "SentimentAnalyzer-" + id + ); + + Materializer mat = Materializer.matFromSystem(context.getSystem()); + + ActorRef self = context.getSelf(); + + Pair, Source> sinkSourcePair = + MergeHub.of(JsonNode.class, 16) + .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) + .run(mat); + + this.hubSink = sinkSourcePair.first(); + Source hubSource = sinkSourcePair.second(); + + // Handle incoming messages from the WebSocket + Sink> jsonSink = Sink.foreach((JsonNode json) -> { + if (json.has("query")) { + String query = json.get("query").asText(); + String sortBy = "popularity"; + if (json.has("sortBy")) { + sortBy = json.get("sortBy").asText(); + } + self.tell(new PerformSearch(query, sortBy)); + } + }); + + // ping websocket to keep it alive + Source heartbeat = Source.tick( + Duration.ZERO, + Duration.ofSeconds(30), + (JsonNode) Json.newObject().put("type", "ping") + ).mapMaterializedValue(c -> NotUsed.getInstance()); + + this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) + .watchTermination((n, stage) -> { + stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); + return NotUsed.getInstance(); + }); + } + + // --- Behavior --- + private Behavior behavior() { + return Behaviors.receive(Message.class) + .onMessage(Connect.class, msg -> { + context.getLog().info("Client connected: {}", id); + msg.replyTo.tell(websocketFlow); + return Behaviors.same(); + }) + .onMessage(PerformSearch.class, msg -> { + context.getLog().info("User {} searching for: {}", id, msg.query); + + Search search = new Search(msg.query, msg.sortBy, apiKey); + + // call News API + search.fetchResults(ws) + // after results arrive, ask the SentimentAnalyzerActor + .thenCompose(v -> { + List texts = search.getResults().stream() + .limit(50) + .map(a -> a.getDescription() != null ? a.getDescription() : "") + .toList(); + + // empty -> neutral + if (texts.isEmpty()) { + search.setSentiment(":-|"); + return java.util.concurrent.CompletableFuture.completedFuture(null); + } + + // use ask pattern for sentiment analysis + return AskPattern.ask( + sentimentActor, + (ActorRef replyTo) -> + new SentimentAnalyzerActor.AnalyzeAverage(texts, replyTo), + Duration.ofSeconds(2), + context.getSystem().scheduler() + ).thenAccept(result -> search.setSentiment(result.sentiment)); + }) + .thenAccept(v2 -> { + // update history + searchHistory.add(0, search); + if (searchHistory.size() > 10) { + searchHistory.remove(searchHistory.size() - 1); + } + + // TODO rm cache + asyncCache.set(id, new ArrayList<>(searchHistory)); + + // Send updated results back to the user + JsonNode response = Json.toJson(searchHistory); + Source.single(response).runWith( + hubSink, + Materializer.matFromSystem(context.getSystem()) + ); }); - }) - .thenAccept(v2 -> { - searchHistory.add(0, search); - if (searchHistory.size() > 10) { - searchHistory.remove(searchHistory.size() - 1); - } - - // TODO rm cache - asyncCache.set(id, new ArrayList<>(searchHistory)); - - // Send updated results back to the user - JsonNode response = Json.toJson(searchHistory); - Source.single(response).runWith( - hubSink, - Materializer.matFromSystem(context.getSystem()) - ); - }); - - return Behaviors.same(); - }) - // Handle the stop message - .onMessage(InternalStop.class, msg -> { - context.getLog().info("WebSocket closed for user {}. Stopping actor.", id); - return Behaviors.stopped(); - }) - .build(); - } -} \ No newline at end of file + + return Behaviors.same(); + }) + // Handle the stop message + .onMessage(InternalStop.class, msg -> { + context.getLog().info("WebSocket closed for user {}. Stopping actor.", id); + return Behaviors.stopped(); + }) + .build(); + } +} From c117f08c57bcc6c7c36048ec6510decdb963c861 Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Sat, 29 Nov 2025 17:46:33 -0500 Subject: [PATCH 13/18] SOLVE conflicts, and ADD SentimentAnalyzerActorTest --- app/actors/SupervisorActor.java | 7 +- app/actors/UserActor.java | 13 +-- test/actors/SentimentAnalyzerActorTest.java | 91 +++++++++++++++++++++ 3 files changed, 100 insertions(+), 11 deletions(-) create mode 100644 test/actors/SentimentAnalyzerActorTest.java diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index b7d1bea..b7509dd 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -80,7 +80,10 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa ); context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); - // one UserActor per logical user id + ActorRef sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "sentimentAnalyzer" + ); Map> children = new HashMap<>(); return Behaviors @@ -92,7 +95,7 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa // Wrap child in a supervision strategy (restart on failure) Behavior childBehavior = Behaviors.supervise( - UserActor.create(msg.id, wsClient, apiKey, asyncCache) + UserActor.create(msg.id, wsClient, apiKey, asyncCache, sentimentActor) ).onFailure( Exception.class, SupervisorStrategy diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index fd43dce..c512209 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -71,24 +71,19 @@ private static final class InternalStop implements Message {} // --- Factory --- // TODO rm cache - public static Behavior create(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache) { - return Behaviors.setup(context -> new UserActor(id, ws, apiKey, asyncCache, context).behavior()); + public static Behavior create(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorRef sentimentActor) { + return Behaviors.setup(context -> new UserActor(id, ws, apiKey, asyncCache, sentimentActor, context).behavior()); } // --- Constructor --- // TODO rm cache - private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorContext context) { + private UserActor(String id, WSClient ws, String apiKey, AsyncCacheApi asyncCache, ActorRef sentimentActor, ActorContext context) { this.id = id; this.ws = ws; this.apiKey = apiKey; this.context = context; this.asyncCache = asyncCache; - - // child sentiment analyzer actor - this.sentimentActor = context.spawn( - SentimentAnalyzerActor.create(), - "SentimentAnalyzer-" + id - ); + this.sentimentActor = sentimentActor; Materializer mat = Materializer.matFromSystem(context.getSystem()); diff --git a/test/actors/SentimentAnalyzerActorTest.java b/test/actors/SentimentAnalyzerActorTest.java new file mode 100644 index 0000000..e9b6e9b --- /dev/null +++ b/test/actors/SentimentAnalyzerActorTest.java @@ -0,0 +1,91 @@ +package actors; + +import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; +import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; +import org.apache.pekko.actor.typed.ActorRef; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Tests for {@link SentimentAnalyzerActor}. + * + * These tests focus on verifying that: + * - the actor replies to AnalyzeSingle / AnalyzeAverage messages, and + * - the reply wraps the sentiment string produced by SentimentAnalyzer + * into a Result message. + * + * The detailed sentiment logic is already covered in SentimentAnalyzer tests. + * + * @author Siming Yi + */ +public class SentimentAnalyzerActorTest { + + private static final ActorTestKit testKit = ActorTestKit.create(); + + @AfterClass + public static void shutdownTestKit() { + testKit.shutdownTestKit(); + } + + @Test + public void analyzeSingle_shouldReplyWithSentimentResult() { + // given + ActorRef actor = + testKit.spawn(SentimentAnalyzerActor.create(), "sentiment-single"); + + TestProbe probe = + testKit.createTestProbe(SentimentAnalyzerActor.Result.class); + + String text = "This is a very happy and joyful day"; + + // when + actor.tell(new SentimentAnalyzerActor.AnalyzeSingle(text, probe.getRef())); + + // then + SentimentAnalyzerActor.Result result = probe.receiveMessage(); + assertNotNull(result); + assertNotNull(result.sentiment); + assertFalse(result.sentiment.isEmpty()); + + // Contract from SentimentAnalyzerActor javadoc: must be one of ":-)", ":-(", ":-|" + assertTrue( + result.sentiment.equals(":-)") + || result.sentiment.equals(":-(") + || result.sentiment.equals(":-|") + ); + } + + @Test + public void analyzeAverage_shouldReplyWithSentimentResult() { + // given + ActorRef actor = + testKit.spawn(SentimentAnalyzerActor.create(), "sentiment-average"); + + TestProbe probe = + testKit.createTestProbe(SentimentAnalyzerActor.Result.class); + + var texts = java.util.List.of( + "Markets are doing well and investors are happy", + "Profits increased and performance was good", + "Overall the outlook is positive and optimistic" + ); + + // when + actor.tell(new SentimentAnalyzerActor.AnalyzeAverage(texts, probe.getRef())); + + // then + SentimentAnalyzerActor.Result result = probe.receiveMessage(); + assertNotNull(result); + assertNotNull(result.sentiment); + assertFalse(result.sentiment.isEmpty()); + + // same contract: sentiment must be one of the three emojis + assertTrue( + result.sentiment.equals(":-)") + || result.sentiment.equals(":-(") + || result.sentiment.equals(":-|") + ); + } +} From e2816b0162a0cb13b651f6204d356d51e7450da8 Mon Sep 17 00:00:00 2001 From: Siming Yi Date: Sat, 29 Nov 2025 20:15:14 -0500 Subject: [PATCH 14/18] SOLVE conflicts of user actor and SupervisorActor --- app/actors/SupervisorActor.java | 11 +++++++++- app/actors/UserActor.java | 36 ++++++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index 6225a35..629c2a4 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -4,6 +4,8 @@ import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.typed.SupervisorStrategy; +import org.apache.pekko.actor.typed.Terminated; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; @@ -11,6 +13,7 @@ import java.util.HashMap; import java.util.Map; +import java.time.Duration; /** * Parent actor that manages the creation of individual UserActors @@ -113,6 +116,12 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa context.getLog().info("ReadabilityActor spawned and ready"); + // Create a single shared SentimentAnalyzerActor for the entire application + ActorRef sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "sentimentAnalyzerActor" + ); + context.getLog().info("sentimentAnalyzerActor spawned and ready"); // Registry to track active UserActors by session ID Map> userActors = new HashMap<>(); @@ -134,7 +143,7 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa context.getLog().info("Creating NEW UserActor for session: {}", create.id); ActorRef child = context.spawn( - UserActor.create(create.id, asyncCache, newsActor, readabilityActor), + UserActor.create(create.id, asyncCache, newsActor, readabilityActor, sentimentActor), "userActor-" + create.id ); diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index 59b2d21..74be961 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -242,7 +242,41 @@ private Behavior onReadabilityResult(ReadabilityResult msg) { .map(Search::getFleschReadingScoreOSS) .toList(); - pushUpdatesToClient(); + // After readability is computed, ask SentimentAnalyzerActor to score the latest search + if (!searchHistory.isEmpty()) { + // we take the latest search (at the front of the history) + Search latest = searchHistory.get(0); + + // collect up to 50 article descriptions as input for sentiment analysis + List texts = latest.getResults().stream() + .limit(50) + .map(article -> { + String desc = article.getDescription(); + return desc != null ? desc : ""; + }) + .filter(desc -> !desc.isBlank()) + .toList(); + + if (texts.isEmpty()) { + // No text to analyse – treat as neutral + latest.setSentiment(":-|"); + pushUpdatesToClient(); + } else { + AskPattern.ask( + sentimentActor, + (ActorRef replyTo) -> + new SentimentAnalyzerActor.AnalyzeAverage(texts, replyTo), + Duration.ofSeconds(2), + context.getSystem().scheduler() + ).thenAccept(result -> { + latest.setSentiment(result.sentiment); + pushUpdatesToClient(); + }); + } + } else { + // Nothing in history – just push what we have (probably empty list) + pushUpdatesToClient(); + } return Behaviors.same(); } From b0edd171b1fbc6877fa1c93832044a0f7d66ae23 Mon Sep 17 00:00:00 2001 From: aVr0Ra Date: Sat, 29 Nov 2025 22:33:09 -0500 Subject: [PATCH 15/18] IMPLEMENT UserActorTest with sentiment analyzer tests(no history and no description) --- test/actors/UserActorTest.java | 82 +++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/test/actors/UserActorTest.java b/test/actors/UserActorTest.java index 8929785..36d5521 100644 --- a/test/actors/UserActorTest.java +++ b/test/actors/UserActorTest.java @@ -23,8 +23,10 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -40,6 +42,7 @@ public class UserActorTest { private AsyncCacheApi mockCache; private TestProbe newsActorProbe; private TestProbe readabilityActorProbe; + private TestProbe sentimentActorProbe; private ActorRef userActor; private String userId; @@ -67,11 +70,14 @@ public void setup() { newsActorProbe = testKit.createTestProbe(NewsActor.Message.class); readabilityActorProbe = testKit.createTestProbe(ReadabilityActor.Message.class); + // Create test probe for Sentiment Analyzer Actor + sentimentActorProbe = testKit.createTestProbe(SentimentAnalyzerActor.Command.class); + // Generate unique user ID for each test userId = "test-user-" + System.currentTimeMillis(); // Create UserActor - userActor = testKit.spawn(UserActor.create(userId, mockCache, newsActorProbe.ref(),readabilityActorProbe.ref())); + userActor = testKit.spawn(UserActor.create(userId, mockCache, newsActorProbe.ref(),readabilityActorProbe.ref(), sentimentActorProbe.ref())); } /** @@ -948,16 +954,26 @@ public void testOnNewsResultsDifferentSortBySameQuery() throws Exception { /** * Simulates the ReadabilityActor responding to a CalculateReadability request. * This is necessary because UserActor now waits for readability results before pushing to client. + * + * ADD: after user actor gets the result of Readability, send a new analyze average request to + * sentiment analyzer, and then return with a :-) for testing purpose * @author Luan Tran + * @author Siming Yi */ private void simulateReadabilityResponse() { ReadabilityActor.CalculateReadability readabilityMsg = readabilityActorProbe.expectMessageClass(ReadabilityActor.CalculateReadability.class); - // Send back the searches (in real scenario, ReadabilityActor would calculate scores first) userActor.tell(new UserActor.ReadabilityResult(readabilityMsg.searches)); + + SentimentAnalyzerActor.AnalyzeAverage sentimentMsg = + sentimentActorProbe.expectMessageClass(SentimentAnalyzerActor.AnalyzeAverage.class); + + // any return is ok, will not check for the value of it + sentimentMsg.replyTo.tell(new SentimentAnalyzerActor.Result(":-)")); } + /** * Create a mock Search object with mocked articles using Mockito. * @param query search query @@ -989,4 +1005,66 @@ private Search createMockSearch(String query, String sortBy, int articleCount, i return search; } + /** + * Create a mock Search With blank descriptions. This is a helper for Sentiment analyzer, + * to test the branch of blank description -> return neutral. + * + * @param query search query + * @param sortBy sort type + * @param articleCount number of articles. + * + * @author Siming Yi + * */ + private Search createMockSearchWithBlankDescriptions(String query, String sortBy, int articleCount) { + Search search = mock(Search.class); + + List
articles = new ArrayList<>(); + for (int i = 0; i < articleCount; i++) { + Article article = mock(Article.class); + // complete blank description + when(article.getDescription()).thenReturn(" "); + articles.add(article); + } + + when(search.getResults()).thenReturn(articles); + when(search.getArticlesToDisplay()).thenReturn(articleCount); + when(search.getNewArticleCount(any(Search.class))).thenReturn(0); + when(search.getRawQuery()).thenReturn(query); + when(search.getSortBy()).thenReturn(sortBy); + return search; + } + + /** + * If readability results with no description, should return neutral. + * Should not call Sentiment analyzer actor + * + * @author Siming Yi + */ + @Test + public void testOnReadabilityResult_noDescriptions() { + Search blankSearch = createMockSearchWithBlankDescriptions("blank-query", "popularity", 5); + List searches = Collections.singletonList(blankSearch); + + // Directly send the readability result to UserActor + userActor.tell(new UserActor.ReadabilityResult(searches)); + + // expected not call for sentiment analyzer actor + sentimentActorProbe.expectNoMessage(Duration.ofSeconds(1)); + + // neutral return + verify(blankSearch, times(1)).setSentiment(":-|"); + } + + /** + * If readability results with no history: + * Push empty history, and expected not call for sentiment analyzer actor + * + * @author Siming Yi + */ + @Test + public void testOnReadabilityResult_noHistory() { + userActor.tell(new UserActor.ReadabilityResult(Collections.emptyList())); + sentimentActorProbe.expectNoMessage(Duration.ofSeconds(1)); + } + } \ No newline at end of file From 6e19372f906f2b2c4cbab5d4262c212905902913 Mon Sep 17 00:00:00 2001 From: aVr0Ra Date: Sat, 29 Nov 2025 23:09:04 -0500 Subject: [PATCH 16/18] FIX SupervisorActor imports --- app/actors/SupervisorActor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index fd0415a..c276924 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -12,6 +12,7 @@ import play.libs.ws.WSClient; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.time.Duration; From 7cdc2f20165c4d5b33b60b52dc53a462ab6888cb Mon Sep 17 00:00:00 2001 From: aVr0Ra Date: Sun, 30 Nov 2025 00:45:08 -0500 Subject: [PATCH 17/18] FIX User actor test for empty articles. --- conf/application.conf | 2 +- test/actors/UserActorTest.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/conf/application.conf b/conf/application.conf index 249a5b9..b8d398f 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -4,4 +4,4 @@ play.modules.enabled += "play.api.cache.ehcache.EhCacheModule" play.modules.enabled += "modules.PekkoModule" -api.key = "1002d5e24b1a4a0b92719ed6281694a9" \ No newline at end of file +api.key = "1b839235c856436eb1bce6e23b08aa5a" \ No newline at end of file diff --git a/test/actors/UserActorTest.java b/test/actors/UserActorTest.java index 7c6cecf..a15d72e 100644 --- a/test/actors/UserActorTest.java +++ b/test/actors/UserActorTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -1280,9 +1281,13 @@ public void testGetStatsWithEmptyArticleList() throws Exception { // Add a search with NO articles (empty list) Search search = createMockSearch("empty-query", "publishedAt", 0, 0); userActor.tell(new UserActor.NewsResults(search)); - simulateReadabilityResponse(); - - // Skip to get history update + + + readabilityActorProbe.expectMessageClass(ReadabilityActor.CalculateReadability.class); + userActor.tell(new UserActor.ReadabilityResult(Collections.singletonList(search))); + + + // Skip to get history update for (int i = 0; i < 5; i++) { JsonNode msg = sub.expectNext(); if (msg.isArray()) break; From a7d1b83069708e1db61b71cb24cbb91f04c7dfe3 Mon Sep 17 00:00:00 2001 From: aVr0Ra Date: Sun, 30 Nov 2025 03:41:53 -0500 Subject: [PATCH 18/18] FIX: auto refresh the page when news updated --- app/actors/NewsActor.java | 73 +++++++++++++++++++++++++++++++++++---- conf/application.conf | 2 +- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/app/actors/NewsActor.java b/app/actors/NewsActor.java index 8b4164d..b2e7bd0 100644 --- a/app/actors/NewsActor.java +++ b/app/actors/NewsActor.java @@ -155,25 +155,49 @@ static final class FetchComplete implements Message { * Wrapper that holds a Search object and its watchers. * @author Luan Tran */ +// static class QueryInfo { +// /** The Search model object containing query parameters and fetched results */ +// final Search search; +// +// /** Set of UserActors watching this query and expecting updates */ +// final Set> watchers = new HashSet<>(); +// +// /** +// * Constructs a new QueryInfo with the specified parameters. +// * @author Luan Tran +// * @param query the search query string +// * @param sortBy the sort order for results +// * @param apiKey the News API key +// */ +// QueryInfo(String query, String sortBy, String apiKey) { +// this.search = new Search(query, sortBy, apiKey); +// } +// } static class QueryInfo { - /** The Search model object containing query parameters and fetched results */ - final Search search; + /** Raw query string for this topic */ + final String query; + + /** Sort order ("popularity", "relevancy", "publishedAt") */ + final String sortBy; + + /** Latest Search snapshot for this query */ + Search search; /** Set of UserActors watching this query and expecting updates */ final Set> watchers = new HashSet<>(); /** * Constructs a new QueryInfo with the specified parameters. - * @author Luan Tran - * @param query the search query string - * @param sortBy the sort order for results - * @param apiKey the News API key */ QueryInfo(String query, String sortBy, String apiKey) { + this.query = query; + this.sortBy = sortBy; + // initial snapshot this.search = new Search(query, sortBy, apiKey); } } + // --- Factory --- /** @@ -268,7 +292,7 @@ private Behavior onWatchQuery(WatchQuery msg) { * @param msg the FetchTick message containing the query key * @return Behaviors.same() to continue with the same behavior */ - private Behavior onFetchTick(FetchTick msg) { + /*private Behavior onFetchTick(FetchTick msg) { QueryInfo info = queries.get(msg.queryKey); if (info == null) { // Query was removed, timer should have been cancelled @@ -293,9 +317,44 @@ private Behavior onFetchTick(FetchTick msg) { } }); + return Behaviors.same(); + }*/ + /** + * Handles FetchTick messages to trigger periodic news fetches. + * For each tick, we create a fresh Search snapshot so that UserActor + * can compare "new results" vs "previous results" correctly. + */ + private Behavior onFetchTick(FetchTick msg) { + QueryInfo info = queries.get(msg.queryKey); + if (info == null) { + // Query was removed, timer should have been cancelled + context.getLog().warn("FetchTick for removed query: {}", msg.queryKey); + return Behaviors.same(); + } + + context.getLog().info("Fetching news for query: {} ({} watchers)", + msg.queryKey, info.watchers.size()); + System.out.println(">>> API CALL TRIGGERED for query: " + msg.queryKey + " at " + java.time.Instant.now()); + + ActorRef self = context.getSelf(); + + // Create a new search by original query + sortyby + apiKey + Search newSearch = new Search(info.query, info.sortBy, apiKey); + info.search = newSearch; + + newSearch.fetchResults(ws).whenComplete((v, throwable) -> { + if (throwable != null) { + self.tell(new FetchComplete(msg.queryKey, false, throwable.getMessage())); + } else { + System.out.println(">>> API CALL COMPLETED for query: " + msg.queryKey); + self.tell(new FetchComplete(msg.queryKey, true, null)); + } + }); + return Behaviors.same(); } + /** * Handles FetchComplete messages after API requests complete. * @author Luan Tran diff --git a/conf/application.conf b/conf/application.conf index b8d398f..154352d 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -4,4 +4,4 @@ play.modules.enabled += "play.api.cache.ehcache.EhCacheModule" play.modules.enabled += "modules.PekkoModule" -api.key = "1b839235c856436eb1bce6e23b08aa5a" \ No newline at end of file +api.key = "a482d6d109894277b9ec6a6d2f4e83e9" \ No newline at end of file