Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions app/actors/StatsActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package actors;

import models.Article;
import models.Stats;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.ActorContext;

import java.util.List;
import java.util.Map;

/**
* Actor responsible for calculating statistics on articles.
* It receives a list of articles and computes word frequency statistics,
* then sends the results back to the requester.
* @author Dara Cunningham
*/
public class StatsActor {
public interface Message {}

/**
* Request to compute word statistics for a list of articles
*/
public static final class ComputeStats implements Message {
final String query;
final List<Article> articles;
final ActorRef<UserActor.Message> replyTo;

public ComputeStats(String query, List<Article> articles, ActorRef<UserActor.Message> replyTo) {
this.query = query;
this.articles = articles;
this.replyTo = replyTo;
}
}

/**
* Response containing the computed word statistics.
*/
public record StatsResult(
boolean found,
String query,
List<Map.Entry<String, Integer>> sortedCounts
) {}

/**
* Internal message when computation completes successfully
*/
private static final class ComputeComplete implements Message {
final String query;
final List<Map.Entry<String, Integer>> sortedCounts;
final ActorRef<UserActor.Message> replyTo;

ComputeComplete(String query, List<Map.Entry<String, Integer>> sortedCounts, ActorRef<UserActor.Message> replyTo) {
this.query = query;
this.sortedCounts = sortedCounts;
this.replyTo = replyTo;
}
}

/**
* Internal message when computation fails
*/
private static final class ComputeFailed implements Message {
final String query;
final String error;
final ActorRef<UserActor.Message> replyTo;

ComputeFailed(String query, String error, ActorRef<UserActor.Message> replyTo) {
this.query = query;
this.error = error;
this.replyTo = replyTo;
}
}

// --- State ---
private final ActorContext<Message> context;

// --- Factory ---
public static Behavior<Message> create() {
return Behaviors.setup(context -> new StatsActor(context).behavior());
}

// --- Constructor ---
private StatsActor(ActorContext<Message> context) {
this.context = context;
}

// --- Behavior ---
private Behavior<Message> behavior() {
return Behaviors.receive(Message.class)
.onMessage(ComputeStats.class, this::onComputeStats)
.onMessage(ComputeComplete.class, this::onComputeComplete)
.onMessage(ComputeFailed.class, this::onComputeFailed)
.build();
}

private Behavior<Message> onComputeStats(ComputeStats msg) {
ActorRef<Message> self = context.getSelf();

try {
context.getLog().info("Computing stats for query: {} ({} articles)",
msg.query, msg.articles.size());

// Compute word statistics using existing Stats utility
List<Map.Entry<String, Integer>> sortedCounts = Stats.calculateSortedCounts(msg.articles);

// Send completion message to self
self.tell(new ComputeComplete(msg.query, sortedCounts, msg.replyTo));

} catch (Exception e) {
context.getLog().error("Failed to compute stats for query: {}", msg.query, e);
self.tell(new ComputeFailed(msg.query, e.getMessage(), msg.replyTo));
}

return Behaviors.same();
}

private Behavior<Message> onComputeComplete(ComputeComplete msg) {
context.getLog().info("Stats computed successfully for query: {} - {} unique words",
msg.query, msg.sortedCounts.size());

// Reply to the requesting UserActor with the results
msg.replyTo.tell(new UserActor.StatsResult(true, msg.query, msg.sortedCounts));

return Behaviors.same();
}

private Behavior<Message> onComputeFailed(ComputeFailed msg) {
context.getLog().error("Stats computation failed for query: {}, error: {}",
msg.query, msg.error);

// Reply to the requesting UserActor with failure
msg.replyTo.tell(new UserActor.StatsResult(false, msg.query, List.of()));

return Behaviors.same();
}
}
144 changes: 107 additions & 37 deletions app/actors/SupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@
import play.cache.AsyncCacheApi;
import play.libs.ws.WSClient;

import java.util.HashMap;
import java.util.Map;
import java.util.*;

/**
* Parent actor that manages the creation of individual UserActors
* for each WebSocket connection and a shared NewsActor.
*
* @author Nattamon Paiboon
* Parent actor that manages the creation of individual UserActors for each WebSocket connection.
*/
public final class SupervisorActor {
private SupervisorActor() {}
private SupervisorActor() {}

/** Base interface for all SupervisorActor messages. */
public interface Message {}
Expand All @@ -42,6 +38,46 @@ public Create(String id, ActorRef<Flow<JsonNode, JsonNode, NotUsed>> replyTo) {
}
}

/**
* Message to request stats for a user's query.
* @author Dara Cunningham
*/
public static final class GetStats implements Message {
public final String userId;
public final String query;
public final ActorRef<StatsResponse> replyTo;

public GetStats(String userId, String query, ActorRef<StatsResponse> replyTo) {
this.userId = userId;
this.query = query;
this.replyTo = replyTo;
}
}

/**
* Internal message to receive stats from UserActor.
* @author Dara Cunningham
*/
private static final class UserStatsResponse implements Message {
final UserActor.StatsResult result;
final ActorRef<StatsResponse> originalReplyTo;

UserStatsResponse(UserActor.StatsResult result, ActorRef<StatsResponse> originalReplyTo) {
this.result = result;
this.originalReplyTo = originalReplyTo;
}
}

/**
* Response message containing stats results (sent back to HomeController).
* @author Dara Cunningham
*/
public record StatsResponse(
boolean found,
String query,
List<Map.Entry<String, Integer>> sortedCounts
) {}

/**
* For get other additional actors which will be used in controller ( sourceActor and sourceInfoActor )
*/
Expand Down Expand Up @@ -87,34 +123,34 @@ private static final class UserActorStopped implements Message {
* @author Nattamon Paiboon, Luan Tran
* */
public static Behavior<Message> create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) {
return Behaviors.setup(context -> {
// create source actor
ActorRef<SourceActor.Message> sourceActor = context.spawn(
SourceActor.create(wsClient),
"sourceActor"
);
// create sourceInfo actor
ActorRef<SourcesInfoActor.Message> sourcesInfoActor = context.spawn(
SourcesInfoActor.create(wsClient, apiKey),
"sourcesInfoActor"
);
context.getLog().info("sourceActor and sourcesInfoActor spawned and ready");
return Behaviors.setup(context -> {
// create source actor
ActorRef<SourceActor.Message> sourceActor = context.spawn(
SourceActor.create(wsClient),
"sourceActor"
);
// create sourceInfo actor
ActorRef<SourcesInfoActor.Message> sourcesInfoActor = context.spawn(
SourcesInfoActor.create(wsClient, apiKey),
"sourcesInfoActor"
);
context.getLog().info("sourceActor and sourcesInfoActor spawned and ready");

// Spawn a single NewsActor that will be shared by all UserActors
ActorRef<NewsActor.Message> newsActor = context.spawn(
NewsActor.create(wsClient, apiKey), "newsActor"
);
context.getLog().info("NewsActor spawned and ready");
// Spawn a single NewsActor that will be shared by all UserActors
ActorRef<NewsActor.Message> newsActor = context.spawn(
NewsActor.create(wsClient, apiKey), "newsActor"
);
context.getLog().info("NewsActor spawned and ready");

// Spawn a single ReadabilityActor that will be shared by all UserActors
ActorRef<ReadabilityActor.Message> readabilityActor = context.spawn(
ReadabilityActor.create(), "readabilityActor"
);
// Spawn a single ReadabilityActor that will be shared by all UserActors
ActorRef<ReadabilityActor.Message> readabilityActor = context.spawn(
ReadabilityActor.create(), "readabilityActor"
);

context.getLog().info("ReadabilityActor spawned and ready");
context.getLog().info("ReadabilityActor spawned and ready");

// Registry to track active UserActors by session ID
Map<String, ActorRef<UserActor.Message>> userActors = new HashMap<>();
// Registry to track active UserActors by session ID
Map<String, ActorRef<UserActor.Message>> userActors = new HashMap<>();



Expand Down Expand Up @@ -144,16 +180,50 @@ public static Behavior<Message> create(WSClient wsClient, String apiKey, AsyncCa
// Watch the actor to know when it stops (only happens after timeout or explicit stop)
context.watchWith(child, new UserActorStopped(create.id));

// Ask the child to establish the stream
child.tell(new UserActor.Connect(create.replyTo));
}

// Ask the child to establish the stream
child.tell(new UserActor.Connect(create.replyTo));
}
return Behaviors.same();
})
.onMessage(GetOtherActors.class, msg -> {
})
.onMessage(GetStats.class, msg -> {
context.getLog().info("Received GetStats request for session: {}, query: {}",
msg. userId, msg.query);

ActorRef<UserActor.Message> userActor = userActors.get(msg.userId);

if (userActor == null) {
context. getLog().info("No UserActor found for session: {}", msg.userId);
msg.replyTo.tell(new StatsResponse(false, msg.query, List.of()));
} else {
ActorRef<UserActor.StatsResult> adapter = context. messageAdapter(
UserActor. StatsResult.class,
result -> new UserStatsResponse(result, msg.replyTo)
);
userActor.tell(new UserActor.GetStats(msg.query, adapter));
}

return Behaviors.same();
})
.onMessage(UserStatsResponse.class, msg -> {
context.getLog().info("Received stats response for query: {}", msg. result.query());

msg.originalReplyTo.tell(new StatsResponse(
msg.result.found(),
msg.result.query(),
msg.result.sortedCounts()
));

return Behaviors.same();
})
.onMessage(GetOtherActors.class, msg -> {
msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor));
return Behaviors.same();
})
.onMessage(UserActorStopped.class, stopped -> {
context.getLog(). info("UserActor terminated for session: {}, removing from registry", stopped. id);
userActors.remove(stopped.id);
return Behaviors.same();
})
.build();
});
}
Expand Down
Loading