Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions app/actors/NewsActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,49 @@ static final class FetchComplete implements Message {
* Wrapper that holds a Search object and its watchers.
* @author Luan Tran
*/
// static class QueryInfo {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code

// /** The Search model object containing query parameters and fetched results */
// final Search search;
//
// /** Set of UserActors watching this query and expecting updates */
// final Set<ActorRef<UserActor.Message>> watchers = new HashSet<>();
//
// /**
// * Constructs a new QueryInfo with the specified parameters.
// * @author Luan Tran
// * @param query the search query string
// * @param sortBy the sort order for results
// * @param apiKey the News API key
// */
// QueryInfo(String query, String sortBy, String apiKey) {
// this.search = new Search(query, sortBy, apiKey);
// }
// }
Comment on lines +158 to +175
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just after Luan review this part

static class QueryInfo {
/** The Search model object containing query parameters and fetched results */
final Search search;
/** Raw query string for this topic */
final String query;

/** Sort order ("popularity", "relevancy", "publishedAt") */
final String sortBy;

/** Latest Search snapshot for this query */
Search search;

/** Set of UserActors watching this query and expecting updates */
final Set<ActorRef<UserActor.Message>> watchers = new HashSet<>();

/**
* Constructs a new QueryInfo with the specified parameters.
* @author Luan Tran
* @param query the search query string
* @param sortBy the sort order for results
* @param apiKey the News API key
*/
QueryInfo(String query, String sortBy, String apiKey) {
this.query = query;
this.sortBy = sortBy;
// initial snapshot
this.search = new Search(query, sortBy, apiKey);
}
}


// --- Factory ---

/**
Expand Down Expand Up @@ -268,7 +292,7 @@ private Behavior<Message> onWatchQuery(WatchQuery msg) {
* @param msg the FetchTick message containing the query key
* @return Behaviors.same() to continue with the same behavior
*/
private Behavior<Message> onFetchTick(FetchTick msg) {
/*private Behavior<Message> onFetchTick(FetchTick msg) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment

QueryInfo info = queries.get(msg.queryKey);
if (info == null) {
// Query was removed, timer should have been cancelled
Expand All @@ -293,9 +317,44 @@ private Behavior<Message> onFetchTick(FetchTick msg) {
}
});

return Behaviors.same();
}*/
/**
* Handles FetchTick messages to trigger periodic news fetches.
* For each tick, we create a fresh Search snapshot so that UserActor
* can compare "new results" vs "previous results" correctly.
*/
private Behavior<Message> onFetchTick(FetchTick msg) {
QueryInfo info = queries.get(msg.queryKey);
if (info == null) {
// Query was removed, timer should have been cancelled
context.getLog().warn("FetchTick for removed query: {}", msg.queryKey);
return Behaviors.same();
}

context.getLog().info("Fetching news for query: {} ({} watchers)",
msg.queryKey, info.watchers.size());
System.out.println(">>> API CALL TRIGGERED for query: " + msg.queryKey + " at " + java.time.Instant.now());

ActorRef<Message> self = context.getSelf();

// Create a new search by original query + sortyby + apiKey
Search newSearch = new Search(info.query, info.sortBy, apiKey);
info.search = newSearch;

newSearch.fetchResults(ws).whenComplete((v, throwable) -> {
if (throwable != null) {
self.tell(new FetchComplete(msg.queryKey, false, throwable.getMessage()));
} else {
System.out.println(">>> API CALL COMPLETED for query: " + msg.queryKey);
self.tell(new FetchComplete(msg.queryKey, true, null));
}
});

return Behaviors.same();
}


/**
* Handles FetchComplete messages after API requests complete.
* @author Luan Tran
Expand Down
88 changes: 88 additions & 0 deletions app/actors/SentimentAnalyzerActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package actors;

import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import services.SentimentAnalyzer;

import java.util.List;

/**
* Actor wrapper around {@link services.SentimentAnalyzer}.
*
* It receives article text(s), computes the sentiment using the existing
* SentimentAnalyzer service, and replies with an emoji string (":-)", ":-(", ":-|").
*
* @author Siming Yi
*/
public final class SentimentAnalyzerActor {

/** Marker interface for all messages this actor accepts. */
public interface Command {}

/**
* Request to analyze a single article.
*/
public static final class AnalyzeSingle implements Command {
public final String text;
public final ActorRef<Result> replyTo;

public AnalyzeSingle(String text, ActorRef<Result> replyTo) {
this.text = text;
this.replyTo = replyTo;
}
}

/**
* Request to analyze the average sentiment of multiple articles.
*/
public static final class AnalyzeAverage implements Command {
public final List<String> texts;
public final ActorRef<Result> replyTo;

public AnalyzeAverage(List<String> texts, ActorRef<Result> replyTo) {
this.texts = texts;
this.replyTo = replyTo;
}
}

/**
* Response message containing the computed sentiment symbol.
*/
public static final class Result {
public final String sentiment;

public Result(String sentiment) {
this.sentiment = sentiment;
}
}

private SentimentAnalyzerActor() {
// no public constructor
}

/**
* Factory method for creating the actor behavior.
*
* @return a Behavior that processes AnalyzeSingle and AnalyzeAverage messages.
*/
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
// Reuse your existing service class
SentimentAnalyzer analyzer = new SentimentAnalyzer();

return Behaviors.receive(Command.class)
.onMessage(AnalyzeSingle.class, msg -> {
String result = analyzer.analyzeSingleArticle(msg.text);
msg.replyTo.tell(new Result(result));
return Behaviors.same();
})
.onMessage(AnalyzeAverage.class, msg -> {
String result = analyzer.analyzeAverageSentiment(msg.texts);
msg.replyTo.tell(new Result(result));
return Behaviors.same();
})
.build();
});
}
}
19 changes: 15 additions & 4 deletions app/actors/SupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.actor.typed.Terminated;
import org.apache.pekko.stream.javadsl.Flow;
import com.fasterxml.jackson.databind.JsonNode;
import play.cache.AsyncCacheApi;
import play.libs.ws.WSClient;

import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.time.Duration;

/**
* Parent actor that manages the creation of individual UserActors for each WebSocket connection.
Expand Down Expand Up @@ -149,8 +154,14 @@ public static Behavior<Message> create(WSClient wsClient, String apiKey, AsyncCa

context.getLog().info("ReadabilityActor spawned and ready");

// Registry to track active UserActors by session ID
Map<String, ActorRef<UserActor.Message>> userActors = new HashMap<>();
// Create a single shared SentimentAnalyzerActor for the entire application
ActorRef<SentimentAnalyzerActor.Command> sentimentActor = context.spawn(
SentimentAnalyzerActor.create(),
"sentimentAnalyzerActor"
);
context.getLog().info("sentimentAnalyzerActor spawned and ready");
// Registry to track active UserActors by session ID
Map<String, ActorRef<UserActor.Message>> userActors = new HashMap<>();



Expand All @@ -170,7 +181,7 @@ public static Behavior<Message> create(WSClient wsClient, String apiKey, AsyncCa
context.getLog().info("Creating NEW UserActor for session: {}", create.id);

ActorRef<UserActor.Message> child = context.spawn(
UserActor.create(create.id, asyncCache, newsActor, readabilityActor),
UserActor.create(create.id, asyncCache, newsActor, readabilityActor, sentimentActor),
"userActor-" + create.id
);

Expand Down
Loading