diff --git a/app/actors/NewsActor.java b/app/actors/NewsActor.java index 8b4164d..b2e7bd0 100644 --- a/app/actors/NewsActor.java +++ b/app/actors/NewsActor.java @@ -155,25 +155,49 @@ static final class FetchComplete implements Message { * Wrapper that holds a Search object and its watchers. * @author Luan Tran */ +// static class QueryInfo { +// /** The Search model object containing query parameters and fetched results */ +// final Search search; +// +// /** Set of UserActors watching this query and expecting updates */ +// final Set> watchers = new HashSet<>(); +// +// /** +// * Constructs a new QueryInfo with the specified parameters. +// * @author Luan Tran +// * @param query the search query string +// * @param sortBy the sort order for results +// * @param apiKey the News API key +// */ +// QueryInfo(String query, String sortBy, String apiKey) { +// this.search = new Search(query, sortBy, apiKey); +// } +// } static class QueryInfo { - /** The Search model object containing query parameters and fetched results */ - final Search search; + /** Raw query string for this topic */ + final String query; + + /** Sort order ("popularity", "relevancy", "publishedAt") */ + final String sortBy; + + /** Latest Search snapshot for this query */ + Search search; /** Set of UserActors watching this query and expecting updates */ final Set> watchers = new HashSet<>(); /** * Constructs a new QueryInfo with the specified parameters. - * @author Luan Tran - * @param query the search query string - * @param sortBy the sort order for results - * @param apiKey the News API key */ QueryInfo(String query, String sortBy, String apiKey) { + this.query = query; + this.sortBy = sortBy; + // initial snapshot this.search = new Search(query, sortBy, apiKey); } } + // --- Factory --- /** @@ -268,7 +292,7 @@ private Behavior onWatchQuery(WatchQuery msg) { * @param msg the FetchTick message containing the query key * @return Behaviors.same() to continue with the same behavior */ - private Behavior onFetchTick(FetchTick msg) { + /*private Behavior onFetchTick(FetchTick msg) { QueryInfo info = queries.get(msg.queryKey); if (info == null) { // Query was removed, timer should have been cancelled @@ -293,9 +317,44 @@ private Behavior onFetchTick(FetchTick msg) { } }); + return Behaviors.same(); + }*/ + /** + * Handles FetchTick messages to trigger periodic news fetches. + * For each tick, we create a fresh Search snapshot so that UserActor + * can compare "new results" vs "previous results" correctly. + */ + private Behavior onFetchTick(FetchTick msg) { + QueryInfo info = queries.get(msg.queryKey); + if (info == null) { + // Query was removed, timer should have been cancelled + context.getLog().warn("FetchTick for removed query: {}", msg.queryKey); + return Behaviors.same(); + } + + context.getLog().info("Fetching news for query: {} ({} watchers)", + msg.queryKey, info.watchers.size()); + System.out.println(">>> API CALL TRIGGERED for query: " + msg.queryKey + " at " + java.time.Instant.now()); + + ActorRef self = context.getSelf(); + + // Create a new search by original query + sortyby + apiKey + Search newSearch = new Search(info.query, info.sortBy, apiKey); + info.search = newSearch; + + newSearch.fetchResults(ws).whenComplete((v, throwable) -> { + if (throwable != null) { + self.tell(new FetchComplete(msg.queryKey, false, throwable.getMessage())); + } else { + System.out.println(">>> API CALL COMPLETED for query: " + msg.queryKey); + self.tell(new FetchComplete(msg.queryKey, true, null)); + } + }); + return Behaviors.same(); } + /** * Handles FetchComplete messages after API requests complete. * @author Luan Tran diff --git a/app/actors/SentimentAnalyzerActor.java b/app/actors/SentimentAnalyzerActor.java new file mode 100644 index 0000000..87f10dc --- /dev/null +++ b/app/actors/SentimentAnalyzerActor.java @@ -0,0 +1,88 @@ +package actors; + +import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import services.SentimentAnalyzer; + +import java.util.List; + +/** + * Actor wrapper around {@link services.SentimentAnalyzer}. + * + * It receives article text(s), computes the sentiment using the existing + * SentimentAnalyzer service, and replies with an emoji string (":-)", ":-(", ":-|"). + * + * @author Siming Yi + */ +public final class SentimentAnalyzerActor { + + /** Marker interface for all messages this actor accepts. */ + public interface Command {} + + /** + * Request to analyze a single article. + */ + public static final class AnalyzeSingle implements Command { + public final String text; + public final ActorRef replyTo; + + public AnalyzeSingle(String text, ActorRef replyTo) { + this.text = text; + this.replyTo = replyTo; + } + } + + /** + * Request to analyze the average sentiment of multiple articles. + */ + public static final class AnalyzeAverage implements Command { + public final List texts; + public final ActorRef replyTo; + + public AnalyzeAverage(List texts, ActorRef replyTo) { + this.texts = texts; + this.replyTo = replyTo; + } + } + + /** + * Response message containing the computed sentiment symbol. + */ + public static final class Result { + public final String sentiment; + + public Result(String sentiment) { + this.sentiment = sentiment; + } + } + + private SentimentAnalyzerActor() { + // no public constructor + } + + /** + * Factory method for creating the actor behavior. + * + * @return a Behavior that processes AnalyzeSingle and AnalyzeAverage messages. + */ + public static Behavior create() { + return Behaviors.setup(context -> { + // Reuse your existing service class + SentimentAnalyzer analyzer = new SentimentAnalyzer(); + + return Behaviors.receive(Command.class) + .onMessage(AnalyzeSingle.class, msg -> { + String result = analyzer.analyzeSingleArticle(msg.text); + msg.replyTo.tell(new Result(result)); + return Behaviors.same(); + }) + .onMessage(AnalyzeAverage.class, msg -> { + String result = analyzer.analyzeAverageSentiment(msg.texts); + msg.replyTo.tell(new Result(result)); + return Behaviors.same(); + }) + .build(); + }); + } +} diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index 4b8b4c9..c276924 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -4,12 +4,17 @@ import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.typed.SupervisorStrategy; +import org.apache.pekko.actor.typed.Terminated; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.ws.WSClient; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.time.Duration; /** * Parent actor that manages the creation of individual UserActors for each WebSocket connection. @@ -149,8 +154,14 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa context.getLog().info("ReadabilityActor spawned and ready"); - // Registry to track active UserActors by session ID - Map> userActors = new HashMap<>(); + // Create a single shared SentimentAnalyzerActor for the entire application + ActorRef sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "sentimentAnalyzerActor" + ); + context.getLog().info("sentimentAnalyzerActor spawned and ready"); + // Registry to track active UserActors by session ID + Map> userActors = new HashMap<>(); @@ -170,7 +181,7 @@ public static Behavior create(WSClient wsClient, String apiKey, AsyncCa context.getLog().info("Creating NEW UserActor for session: {}", create.id); ActorRef child = context.spawn( - UserActor.create(create.id, asyncCache, newsActor, readabilityActor), + UserActor.create(create.id, asyncCache, newsActor, readabilityActor, sentimentActor), "userActor-" + create.id ); diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index f42b1d6..3a59347 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -5,13 +5,20 @@ import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed. Behavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; -import org.apache.pekko.actor.typed.javadsl. Behaviors; -import org.apache.pekko.stream.Materializer; -import org.apache.pekko.stream.javadsl.*; +import org.apache.pekko.actor.typed.javadsl.AskPattern; +import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.japi.Pair; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.BroadcastHub; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.MergeHub; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import com.fasterxml.jackson.databind.JsonNode; import play.cache.AsyncCacheApi; import play.libs.Json; +import play.libs.ws.WSClient; import models.Search; import models.Article; import models.Search; @@ -132,6 +139,8 @@ private static final class InternalStop implements Message {} private final Sink hubSink; private final Flow websocketFlow; + /** Child actor that performs sentiment analysis for this user. */ + private final ActorRef sentimentActor; // TODO rm cache private final AsyncCacheApi asyncCache; @@ -163,8 +172,8 @@ private static class SearchInstance { * @author Nattamon Paiboon */ // TODO rm cache - public static Behavior create(String id, AsyncCacheApi asyncCache, ActorRef newsActor, ActorRef rActor) { - return Behaviors.setup(context -> new UserActor(id, asyncCache, context, newsActor, rActor).behavior()); + public static Behavior create(String id, AsyncCacheApi asyncCache, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { + return Behaviors.setup(context -> new UserActor(id, asyncCache, context, newsActor, rActor, sentimentActor).behavior()); } // --- Constructor --- @@ -178,27 +187,28 @@ public static Behavior create(String id, AsyncCacheApi asyncCache, Acto * @author Nattamon Paiboon */ // TODO rm cache - private UserActor(String id, AsyncCacheApi asyncCache, ActorContext context, ActorRef newsActor, ActorRef rActor) { + private UserActor(String id, AsyncCacheApi asyncCache, ActorContext context, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { this.id = id; this.context = context; this.asyncCache = asyncCache; this.newsActor = newsActor; this.rActor = rActor; + this.sentimentActor = sentimentActor; - Materializer mat = Materializer.matFromSystem(context.getSystem()); + Materializer mat = Materializer.matFromSystem(context.getSystem()); // Spawn StatsActor this.statsActor = context.spawn(StatsActor.create(), "statsActor-" + id); ActorRef self = context.getSelf(); - Pair, Source> sinkSourcePair = - MergeHub.of(JsonNode.class, 16) - .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) - .run(mat); + Pair, Source> sinkSourcePair = + MergeHub.of(JsonNode.class, 16) + .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) + .run(mat); - this.hubSink = sinkSourcePair.first(); - Source hubSource = sinkSourcePair.second(); + this.hubSink = sinkSourcePair.first(); + Source hubSource = sinkSourcePair.second(); // Handle incoming messages from the WebSocket Sink> jsonSink = Sink.foreach((JsonNode json) -> { @@ -228,19 +238,19 @@ private UserActor(String id, AsyncCacheApi asyncCache, ActorContext con } }); - // ping websocket to keep it alive - Source heartbeat = Source.tick( - Duration.ZERO, - Duration.ofSeconds(30), - (JsonNode) Json.newObject().put("type", "ping") - ).mapMaterializedValue(c -> NotUsed.getInstance()); - - this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) - .watchTermination((n, stage) -> { - stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); - return NotUsed.getInstance(); - }); - } + // ping websocket to keep it alive + Source heartbeat = Source.tick( + Duration.ZERO, + Duration.ofSeconds(30), + (JsonNode) Json.newObject().put("type", "ping") + ).mapMaterializedValue(c -> NotUsed.getInstance()); + + this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) + .watchTermination((n, stage) -> { + stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); + return NotUsed.getInstance(); + }); + } // --- Behavior --- private Behavior behavior() { @@ -263,7 +273,41 @@ private Behavior onReadabilityResult(ReadabilityResult msg) { .map(Search::getFleschReadingScoreOSS) .toList(); - pushUpdatesToClient(); + // After readability is computed, ask SentimentAnalyzerActor to score the latest search + if (!searchHistory.isEmpty()) { + // we take the latest search (at the front of the history) + Search latest = searchHistory.get(0); + + // collect up to 50 article descriptions as input for sentiment analysis + List texts = latest.getResults().stream() + .limit(50) + .map(article -> { + String desc = article.getDescription(); + return desc != null ? desc : ""; + }) + .filter(desc -> !desc.isBlank()) + .toList(); + + if (texts.isEmpty()) { + // No text to analyse – treat as neutral + latest.setSentiment(":-|"); + pushUpdatesToClient(); + } else { + AskPattern.ask( + sentimentActor, + (ActorRef replyTo) -> + new SentimentAnalyzerActor.AnalyzeAverage(texts, replyTo), + Duration.ofSeconds(2), + context.getSystem().scheduler() + ).thenAccept(result -> { + latest.setSentiment(result.sentiment); + pushUpdatesToClient(); + }); + } + } else { + // Nothing in history – just push what we have (probably empty list) + pushUpdatesToClient(); + } return Behaviors.same(); } diff --git a/conf/application.conf b/conf/application.conf index 249a5b9..154352d 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -4,4 +4,4 @@ play.modules.enabled += "play.api.cache.ehcache.EhCacheModule" play.modules.enabled += "modules.PekkoModule" -api.key = "1002d5e24b1a4a0b92719ed6281694a9" \ No newline at end of file +api.key = "a482d6d109894277b9ec6a6d2f4e83e9" \ No newline at end of file diff --git a/test/actors/SentimentAnalyzerActorTest.java b/test/actors/SentimentAnalyzerActorTest.java new file mode 100644 index 0000000..e9b6e9b --- /dev/null +++ b/test/actors/SentimentAnalyzerActorTest.java @@ -0,0 +1,91 @@ +package actors; + +import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; +import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; +import org.apache.pekko.actor.typed.ActorRef; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Tests for {@link SentimentAnalyzerActor}. + * + * These tests focus on verifying that: + * - the actor replies to AnalyzeSingle / AnalyzeAverage messages, and + * - the reply wraps the sentiment string produced by SentimentAnalyzer + * into a Result message. + * + * The detailed sentiment logic is already covered in SentimentAnalyzer tests. + * + * @author Siming Yi + */ +public class SentimentAnalyzerActorTest { + + private static final ActorTestKit testKit = ActorTestKit.create(); + + @AfterClass + public static void shutdownTestKit() { + testKit.shutdownTestKit(); + } + + @Test + public void analyzeSingle_shouldReplyWithSentimentResult() { + // given + ActorRef actor = + testKit.spawn(SentimentAnalyzerActor.create(), "sentiment-single"); + + TestProbe probe = + testKit.createTestProbe(SentimentAnalyzerActor.Result.class); + + String text = "This is a very happy and joyful day"; + + // when + actor.tell(new SentimentAnalyzerActor.AnalyzeSingle(text, probe.getRef())); + + // then + SentimentAnalyzerActor.Result result = probe.receiveMessage(); + assertNotNull(result); + assertNotNull(result.sentiment); + assertFalse(result.sentiment.isEmpty()); + + // Contract from SentimentAnalyzerActor javadoc: must be one of ":-)", ":-(", ":-|" + assertTrue( + result.sentiment.equals(":-)") + || result.sentiment.equals(":-(") + || result.sentiment.equals(":-|") + ); + } + + @Test + public void analyzeAverage_shouldReplyWithSentimentResult() { + // given + ActorRef actor = + testKit.spawn(SentimentAnalyzerActor.create(), "sentiment-average"); + + TestProbe probe = + testKit.createTestProbe(SentimentAnalyzerActor.Result.class); + + var texts = java.util.List.of( + "Markets are doing well and investors are happy", + "Profits increased and performance was good", + "Overall the outlook is positive and optimistic" + ); + + // when + actor.tell(new SentimentAnalyzerActor.AnalyzeAverage(texts, probe.getRef())); + + // then + SentimentAnalyzerActor.Result result = probe.receiveMessage(); + assertNotNull(result); + assertNotNull(result.sentiment); + assertFalse(result.sentiment.isEmpty()); + + // same contract: sentiment must be one of the three emojis + assertTrue( + result.sentiment.equals(":-)") + || result.sentiment.equals(":-(") + || result.sentiment.equals(":-|") + ); + } +} diff --git a/test/actors/UserActorTest.java b/test/actors/UserActorTest.java index a8daaec..a15d72e 100644 --- a/test/actors/UserActorTest.java +++ b/test/actors/UserActorTest.java @@ -23,9 +23,12 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -41,6 +44,7 @@ public class UserActorTest { private AsyncCacheApi mockCache; private TestProbe newsActorProbe; private TestProbe readabilityActorProbe; + private TestProbe sentimentActorProbe; private ActorRef userActor; private String userId; @@ -68,11 +72,14 @@ public void setup() { newsActorProbe = testKit.createTestProbe(NewsActor.Message.class); readabilityActorProbe = testKit.createTestProbe(ReadabilityActor.Message.class); + // Create test probe for Sentiment Analyzer Actor + sentimentActorProbe = testKit.createTestProbe(SentimentAnalyzerActor.Command.class); + // Generate unique user ID for each test userId = "test-user-" + System.currentTimeMillis(); // Create UserActor - userActor = testKit.spawn(UserActor.create(userId, mockCache, newsActorProbe.ref(),readabilityActorProbe.ref())); + userActor = testKit.spawn(UserActor.create(userId, mockCache, newsActorProbe.ref(),readabilityActorProbe.ref(), sentimentActorProbe.ref())); } /** @@ -1274,9 +1281,13 @@ public void testGetStatsWithEmptyArticleList() throws Exception { // Add a search with NO articles (empty list) Search search = createMockSearch("empty-query", "publishedAt", 0, 0); userActor.tell(new UserActor.NewsResults(search)); - simulateReadabilityResponse(); - - // Skip to get history update + + + readabilityActorProbe.expectMessageClass(ReadabilityActor.CalculateReadability.class); + userActor.tell(new UserActor.ReadabilityResult(Collections.singletonList(search))); + + + // Skip to get history update for (int i = 0; i < 5; i++) { JsonNode msg = sub.expectNext(); if (msg.isArray()) break; @@ -1350,16 +1361,26 @@ public void testStatsFlowCompletesSuccessfully() throws Exception { /** * Simulates the ReadabilityActor responding to a CalculateReadability request. * This is necessary because UserActor now waits for readability results before pushing to client. + * + * ADD: after user actor gets the result of Readability, send a new analyze average request to + * sentiment analyzer, and then return with a :-) for testing purpose * @author Luan Tran + * @author Siming Yi */ private void simulateReadabilityResponse() { ReadabilityActor.CalculateReadability readabilityMsg = readabilityActorProbe.expectMessageClass(ReadabilityActor.CalculateReadability.class); - // Send back the searches (in real scenario, ReadabilityActor would calculate scores first) userActor.tell(new UserActor.ReadabilityResult(readabilityMsg.searches)); + + SentimentAnalyzerActor.AnalyzeAverage sentimentMsg = + sentimentActorProbe.expectMessageClass(SentimentAnalyzerActor.AnalyzeAverage.class); + + // any return is ok, will not check for the value of it + sentimentMsg.replyTo.tell(new SentimentAnalyzerActor.Result(":-)")); } + /** * Create a mock Search object with mocked articles using Mockito. * @param query search query @@ -1391,4 +1412,66 @@ private Search createMockSearch(String query, String sortBy, int articleCount, i return search; } + /** + * Create a mock Search With blank descriptions. This is a helper for Sentiment analyzer, + * to test the branch of blank description -> return neutral. + * + * @param query search query + * @param sortBy sort type + * @param articleCount number of articles. + * + * @author Siming Yi + * */ + private Search createMockSearchWithBlankDescriptions(String query, String sortBy, int articleCount) { + Search search = mock(Search.class); + + List
articles = new ArrayList<>(); + for (int i = 0; i < articleCount; i++) { + Article article = mock(Article.class); + // complete blank description + when(article.getDescription()).thenReturn(" "); + articles.add(article); + } + + when(search.getResults()).thenReturn(articles); + when(search.getArticlesToDisplay()).thenReturn(articleCount); + when(search.getNewArticleCount(any(Search.class))).thenReturn(0); + when(search.getRawQuery()).thenReturn(query); + when(search.getSortBy()).thenReturn(sortBy); + return search; + } + + /** + * If readability results with no description, should return neutral. + * Should not call Sentiment analyzer actor + * + * @author Siming Yi + */ + @Test + public void testOnReadabilityResult_noDescriptions() { + Search blankSearch = createMockSearchWithBlankDescriptions("blank-query", "popularity", 5); + List searches = Collections.singletonList(blankSearch); + + // Directly send the readability result to UserActor + userActor.tell(new UserActor.ReadabilityResult(searches)); + + // expected not call for sentiment analyzer actor + sentimentActorProbe.expectNoMessage(Duration.ofSeconds(1)); + + // neutral return + verify(blankSearch, times(1)).setSentiment(":-|"); + } + + /** + * If readability results with no history: + * Push empty history, and expected not call for sentiment analyzer actor + * + * @author Siming Yi + */ + @Test + public void testOnReadabilityResult_noHistory() { + userActor.tell(new UserActor.ReadabilityResult(Collections.emptyList())); + sentimentActorProbe.expectNoMessage(Duration.ofSeconds(1)); + } + } \ No newline at end of file