diff --git a/api-service-app/build.gradle.kts b/api-service-app/build.gradle.kts index 452859d..e071c96 100644 --- a/api-service-app/build.gradle.kts +++ b/api-service-app/build.gradle.kts @@ -7,6 +7,7 @@ dependencies { implementation(project(":common")) implementation(project(":infra-redis")) implementation(project(":logic-module")) + implementation(project(":persistence-module")) implementation(project(":protocol")) implementation(project(":multimedia-module")) implementation("io.lettuce:lettuce-core:6.7.1.RELEASE") @@ -16,6 +17,9 @@ dependencies { implementation("io.micronaut:micronaut-runtime:4.9.0") implementation("io.micronaut:micronaut-http-server-netty:4.9.0") implementation("io.micronaut:micronaut-jackson-databind:4.9.0") + implementation("org.flywaydb:flyway-core:10.20.1") + implementation("org.flywaydb:flyway-database-postgresql:10.20.1") + implementation("org.postgresql:postgresql:42.7.5") runtimeOnly("ch.qos.logback:logback-classic:1.5.18") runtimeOnly("org.yaml:snakeyaml:2.4") diff --git a/api-service-app/src/main/java/com/github/lystran/mochat/apiservice/runtime/ApiServiceRuntimeFactory.java b/api-service-app/src/main/java/com/github/lystran/mochat/apiservice/runtime/ApiServiceRuntimeFactory.java index 8c1923d..7d2c26c 100644 --- a/api-service-app/src/main/java/com/github/lystran/mochat/apiservice/runtime/ApiServiceRuntimeFactory.java +++ b/api-service-app/src/main/java/com/github/lystran/mochat/apiservice/runtime/ApiServiceRuntimeFactory.java @@ -1,6 +1,7 @@ package com.github.lystran.mochat.apiservice.runtime; import com.github.lystran.mochat.common.event.EventBus; +import com.github.lystran.mochat.common.id.IdGenerator; import com.github.lystran.mochat.common.offline.OfflineQueue; import com.github.lystran.mochat.infra.redis.RedisEventBus; import com.github.lystran.mochat.infra.redis.RedisOfflineQueue; @@ -12,13 +13,57 @@ import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Requires; +import jakarta.annotation.PostConstruct; import jakarta.inject.Singleton; +import org.flywaydb.core.Flyway; +import org.postgresql.ds.PGSimpleDataSource; + +import javax.sql.DataSource; /** - * 按配置创建 `api-service` 运行时要用到的 Redis 相关组件。 + * 按配置创建 `api-service` 运行时要用到的基础组件。 */ @Factory public final class ApiServiceRuntimeFactory { + private static final long CUSTOM_EPOCH_MILLIS = 1_704_067_200_000L; + private static final int WORKER_ID_BITS = 10; + private static final int SEQUENCE_BITS = 12; + private static final long MAX_WORKER_ID = (1L << WORKER_ID_BITS) - 1; + private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1; + private static final int TIMESTAMP_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS; + private static final int WORKER_ID_SHIFT = SEQUENCE_BITS; + + /** + * 在启用 PostgreSQL 依赖时创建数据源。 + */ + @Singleton + @Requires(property = "mochat.api-service.dependencies.postgres-enabled", notEquals = "false", defaultValue = "true") + @Requires(missingBeans = DataSource.class) + DataSource dataSource( + @Property(name = "mochat.postgres.url") String url, + @Property(name = "mochat.postgres.username") String username, + @Property(name = "mochat.postgres.password") String password + ) { + PGSimpleDataSource dataSource = new PGSimpleDataSource(); + dataSource.setURL(url); + dataSource.setUser(username); + dataSource.setPassword(password); + return dataSource; + } + + /** + * 创建 Flyway,用来管理 api-service 依赖的社交关系读写表结构。 + */ + @Singleton + @Requires(bean = DataSource.class) + @Requires(missingBeans = Flyway.class) + Flyway flyway(DataSource dataSource, @Property(name = "mochat.flyway.locations") String flywayLocations) { + return Flyway.configure() + .dataSource(dataSource) + .locations(flywayLocations) + .load(); + } + /** * 在启用 Redis 依赖时创建 Redis 客户端。 */ @@ -81,4 +126,70 @@ EventBus eventBus( OfflineQueue offlineQueue(RedisCommands redisCommands) { return new RedisOfflineQueue(redisCommands); } + + /** + * 创建 api-service 用于用户、好友、群组等业务写入的全局 id 生成器。 + */ + @Singleton + @Requires(missingBeans = IdGenerator.class) + IdGenerator idGenerator(@Property(name = "mochat.api-service.id.worker-id", defaultValue = "2") long workerId) { + return new ApiServiceSnowflakeIdGenerator(workerId); + } + + private static final class ApiServiceSnowflakeIdGenerator implements IdGenerator { + private final long workerId; + private long lastTimestamp = -1L; + private long sequence; + + ApiServiceSnowflakeIdGenerator(long workerId) { + if (workerId < 0 || workerId > MAX_WORKER_ID) { + throw new IllegalArgumentException("workerId must be between 0 and " + MAX_WORKER_ID); + } + this.workerId = workerId; + } + + @Override + public synchronized long nextId() { + long timestamp = System.currentTimeMillis(); + if (timestamp < lastTimestamp) { + throw new IllegalStateException("system clock moved backwards"); + } + if (timestamp == lastTimestamp) { + sequence = (sequence + 1) & MAX_SEQUENCE; + if (sequence == 0L) { + timestamp = waitForNextMillis(lastTimestamp); + } + } else { + sequence = 0L; + } + lastTimestamp = timestamp; + return ((timestamp - CUSTOM_EPOCH_MILLIS) << TIMESTAMP_SHIFT) + | (workerId << WORKER_ID_SHIFT) + | sequence; + } + + private long waitForNextMillis(long previousTimestamp) { + long timestamp = System.currentTimeMillis(); + while (timestamp <= previousTimestamp) { + timestamp = System.currentTimeMillis(); + } + return timestamp; + } + } +} + +@Singleton +@Requires(bean = Flyway.class) +@Requires(property = "mochat.api-service.flyway.migrate-on-start", value = "true", defaultValue = "true") +final class ApiServiceFlywayMigrationBootstrap { + private final Flyway flyway; + + ApiServiceFlywayMigrationBootstrap(Flyway flyway) { + this.flyway = flyway; + } + + @PostConstruct + void migrate() { + flyway.migrate(); + } } diff --git a/api-service-app/src/main/resources/application.yml b/api-service-app/src/main/resources/application.yml index 12284d9..f045132 100644 --- a/api-service-app/src/main/resources/application.yml +++ b/api-service-app/src/main/resources/application.yml @@ -4,8 +4,28 @@ micronaut: server: host: ${MOCHAT_API_SERVICE_HTTP_HOST:0.0.0.0} port: ${MOCHAT_API_SERVICE_HTTP_PORT:8080} + cors: + enabled: true + configurations: + web: + allowed-origins: + - '*' + allowed-methods: + - GET + - POST + - PUT + - DELETE + - OPTIONS + allowed-headers: + - '*' mochat: + flyway: + locations: classpath:db/migration + postgres: + url: ${MOCHAT_POSTGRES_URL:`jdbc:postgresql://127.0.0.1:15432/mochat`} + username: ${MOCHAT_POSTGRES_USERNAME:mochat} + password: ${MOCHAT_POSTGRES_PASSWORD:mochat} redis: uri: '${MOCHAT_REDIS_URI:`redis://127.0.0.1:6379`}' api-service: @@ -14,6 +34,10 @@ mochat: port: ${MOCHAT_API_SERVICE_HTTP_PORT:8080} grpc: port: ${MOCHAT_API_SERVICE_GRPC_PORT:19091} + flyway: + migrate-on-start: ${MOCHAT_API_SERVICE_FLYWAY_MIGRATE_ON_START:true} + id: + worker-id: ${MOCHAT_API_SERVICE_ID_WORKER_ID:2} dependencies: redis-enabled: ${MOCHAT_API_SERVICE_REDIS_ENABLED:true} postgres-enabled: ${MOCHAT_API_SERVICE_POSTGRES_ENABLED:true} diff --git a/api-service-app/src/test/resources/application.yml b/api-service-app/src/test/resources/application.yml new file mode 100644 index 0000000..3e727f0 --- /dev/null +++ b/api-service-app/src/test/resources/application.yml @@ -0,0 +1,14 @@ +micronaut: + application: + name: api-service + +mochat: + redis: + uri: redis://127.0.0.1:6379 + api-service: + dependencies: + redis-enabled: true + postgres-enabled: false + message-service: + inbound-consumer: + enabled: false diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/controller/CallController.java b/call-module/src/main/java/com/github/lystran/mochat/call/controller/CallController.java index 893dd62..0ce7280 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/controller/CallController.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/controller/CallController.java @@ -30,7 +30,25 @@ public HttpResponse invitePrivateCall(@Body PrivateCallInviteRequest request) return unauthorized(); } try { - return HttpResponse.ok(ApiResponse.ok(callService.invitePrivateCall(userId.get(), request.toUserId()))); + return HttpResponse.ok(ApiResponse.ok(callService.invitePrivateCall(userId.get(), request.toUserId(), request.callKind()))); + } catch (IllegalArgumentException | IllegalStateException exception) { + return HttpResponse.badRequest(ApiResponse.error(exception.getMessage())); + } + } + + @Post("/private/signal") + public HttpResponse signalPrivateCall(@Body PrivateCallSignalRequest request) { + var userId = resolveSession(request.sessionId()); + if (userId.isEmpty()) { + return unauthorized(); + } + try { + return HttpResponse.ok(ApiResponse.ok(callService.forwardPrivateSignal( + userId.get(), + request.toUserId(), + request.type(), + request.roomName() + ))); } catch (IllegalArgumentException | IllegalStateException exception) { return HttpResponse.badRequest(ApiResponse.error(exception.getMessage())); } @@ -97,7 +115,10 @@ public static ApiResponse error(String message) { } } - public record PrivateCallInviteRequest(String sessionId, long toUserId) { + public record PrivateCallInviteRequest(String sessionId, long toUserId, String callKind) { + } + + public record PrivateCallSignalRequest(String sessionId, long toUserId, String type, String roomName) { } public record GroupCallStartRequest(String sessionId, long groupId) { diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/dto/CallSignalMessage.java b/call-module/src/main/java/com/github/lystran/mochat/call/dto/CallSignalMessage.java index d5b09ad..763dd66 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/dto/CallSignalMessage.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/dto/CallSignalMessage.java @@ -8,6 +8,7 @@ public record CallSignalMessage( long toUserId, long groupId, String roomName, + String callKind, long timestampMillis ) { } diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/mq/CallOfflineNotificationMqProducer.java b/call-module/src/main/java/com/github/lystran/mochat/call/mq/CallOfflineNotificationMqProducer.java index f32ffae..50843fc 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/mq/CallOfflineNotificationMqProducer.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/mq/CallOfflineNotificationMqProducer.java @@ -5,6 +5,7 @@ import com.github.lystran.mochat.call.dto.CallSignalMessage; import io.micronaut.context.annotation.Property; +import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; @@ -17,6 +18,7 @@ /** 群通话离线通知的 MQ 生产者。 */ @Singleton +@Requires(property = "mochat.call-service.dependencies.mq-enabled", notEquals = "false", defaultValue = "true") public final class CallOfflineNotificationMqProducer { public static final String DEFAULT_TOPIC = "mochat.call.offline-notifications"; diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/service/CallOfflineNotificationService.java b/call-module/src/main/java/com/github/lystran/mochat/call/service/CallOfflineNotificationService.java index 06ae559..e88884e 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/service/CallOfflineNotificationService.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/service/CallOfflineNotificationService.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; /** 处理群通话离线通知的入库和上线补推。 */ @Singleton @@ -41,7 +42,7 @@ public CallOfflineNotificationService( CallRoomManager callRoomManager, CallRelationshipService relationshipService, CallSignalGateway signalGateway, - CallOfflineNotificationMqProducer mqProducer + Optional mqProducer ) { this.sqlSessionFactory = Objects.requireNonNull(sqlSessionFactory, "sqlSessionFactory"); this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper"); @@ -49,7 +50,7 @@ public CallOfflineNotificationService( this.callRoomManager = Objects.requireNonNull(callRoomManager, "callRoomManager"); this.relationshipService = Objects.requireNonNull(relationshipService, "relationshipService"); this.signalGateway = Objects.requireNonNull(signalGateway, "signalGateway"); - this.mqProducer = Objects.requireNonNull(mqProducer, "mqProducer"); + this.mqProducer = mqProducer.orElse(null); } public void enqueue(CallSignalMessage message) { @@ -76,7 +77,7 @@ public void enqueueBatch(List messages) { if (messages == null || messages.isEmpty()) { return; } - if (mqProducer.sendBatch(messages)) { + if (mqProducer != null && mqProducer.sendBatch(messages)) { return; } // MQ 发送失败,同步兜底 diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/service/CallService.java b/call-module/src/main/java/com/github/lystran/mochat/call/service/CallService.java index de9abcf..9e7c79b 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/service/CallService.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/service/CallService.java @@ -50,8 +50,13 @@ public CallService( } public PrivateCallInviteResult invitePrivateCall(long fromUserId, long toUserId) { + return invitePrivateCall(fromUserId, toUserId, "voice"); + } + + public PrivateCallInviteResult invitePrivateCall(long fromUserId, long toUserId, String callKind) { requireDifferentPositiveUsers(fromUserId, toUserId); requireActivePrivateRelationship(fromUserId, toUserId); + String normalizedCallKind = normalizeCallKind(callKind); String callId = newCallId(); CallRoomState room = callRoomManager.createPrivateRoom(fromUserId, toUserId, callId); @@ -63,10 +68,11 @@ public PrivateCallInviteResult invitePrivateCall(long fromUserId, long toUserId) toUserId, 0L, room.roomName(), + normalizedCallKind, clock.millis() )); if(!callInvite){ - //插入离线消息 + // Keep the room alive so the callee can accept a queued invite after reconnecting. offlineNotificationService.enqueueBatch(List.of(new CallSignalMessage( "call_invite", callId, @@ -74,13 +80,13 @@ public PrivateCallInviteResult invitePrivateCall(long fromUserId, long toUserId) toUserId, -1, room.roomName(), + normalizedCallKind, clock.millis() ))); - callRoomManager.endRoom(room.roomName()); } - String token = callInvite ? callTokenService.issueToken(fromUserId, room.roomName()):null; - String livekitUrl = callInvite ? callTokenService.livekitUrl():null; - return new PrivateCallInviteResult(callId, room.roomName(), fromUserId, toUserId, token, livekitUrl); + String token = callTokenService.issueToken(fromUserId, room.roomName()); + String livekitUrl = callTokenService.livekitUrl(); + return new PrivateCallInviteResult(callId, room.roomName(), fromUserId, toUserId, normalizedCallKind, token, livekitUrl); } public PrivateSignalResult forwardPrivateSignal(long fromUserId, long toUserId, String type, String roomName) { @@ -115,12 +121,13 @@ public PrivateSignalResult forwardPrivateSignal(long fromUserId, long toUserId, toUserId, 0L, parsed.value(), + "voice", clock.millis() )); String token = "call_accept".equals(normalizedType) ? callTokenService.issueToken(fromUserId, parsed.value()) : null; String livekitUrl = token == null ? null : callTokenService.livekitUrl(); - return new PrivateSignalResult(delivered, token, livekitUrl); + return new PrivateSignalResult(delivered, parsed.value(), token, livekitUrl); } public GroupCallStartResult startGroupCall(long fromUserId, long groupId) { @@ -148,6 +155,7 @@ public GroupCallStartResult startGroupCall(long fromUserId, long groupId) { memberUserId, groupId, room.roomName(), + "voice", clock.millis() ); if (signalGateway.sendToUser(memberUserId, message)) { @@ -214,6 +222,7 @@ private void notifyGroupParticipants(CallRoomName roomName, long actorUserId, St recipientUserId, roomName.groupId(), roomName.value(), + "voice", clock.millis() )); } @@ -265,6 +274,17 @@ private static String normalizeSignalType(String type) { return type.trim().toLowerCase(Locale.ROOT); } + private static String normalizeCallKind(String callKind) { + if (callKind == null || callKind.isBlank()) { + return "voice"; + } + String normalized = callKind.trim().toLowerCase(Locale.ROOT); + if (!"voice".equals(normalized) && !"video".equals(normalized)) { + throw new IllegalArgumentException("callKind must be voice or video"); + } + return normalized; + } + private static String newCallId() { return UUID.randomUUID().toString().replace("-", ""); } @@ -274,12 +294,13 @@ public record PrivateCallInviteResult( String roomName, long fromUserId, long toUserId, + String callKind, String token, String livekitUrl ) { } - public record PrivateSignalResult(boolean delivered, String token, String livekitUrl) { + public record PrivateSignalResult(boolean delivered, String roomName, String token, String livekitUrl) { } public record GroupCallStartResult( diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallSignalGateway.java b/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallSignalGateway.java index a411546..a8a1804 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallSignalGateway.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallSignalGateway.java @@ -97,6 +97,7 @@ private String toJson(CallSignalMessage message) { "toUserId", message.toUserId(), "groupId", message.groupId(), "roomName", message.roomName(), + "callKind", message.callKind(), "timestampMillis", message.timestampMillis() )); } catch (JsonProcessingException e) { diff --git a/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallWebSocket.java b/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallWebSocket.java index f0aa14d..0f69987 100644 --- a/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallWebSocket.java +++ b/call-module/src/main/java/com/github/lystran/mochat/call/websocket/CallWebSocket.java @@ -76,7 +76,7 @@ public void onMessage(String json, WebSocketSession session) { request.type(), request.roomName() ); - if ("call_accept".equals(request.type()) && result.delivered()) { + if ("call_accept".equals(request.type()) && result.token() != null) { sendJson(session, Map.of( "type", "call_accepted_with_token", "token", result.token(), diff --git a/call-service-app/build.gradle.kts b/call-service-app/build.gradle.kts index 64d07a4..a36ea62 100644 --- a/call-service-app/build.gradle.kts +++ b/call-service-app/build.gradle.kts @@ -3,6 +3,16 @@ plugins { } val jacksonVersion = "2.18.3" +val nettyVersion = "4.2.2.Final" + +configurations.configureEach { + resolutionStrategy.eachDependency { + if (requested.group == "io.netty") { + useVersion(nettyVersion) + because("Micronaut 4.9 WebSocket compression expects a consistent Netty 4.2 API surface.") + } + } +} dependencies { implementation(project(":service-runtime")) diff --git a/call-service-app/src/main/resources/application.yml b/call-service-app/src/main/resources/application.yml index 8046dda..e167722 100644 --- a/call-service-app/src/main/resources/application.yml +++ b/call-service-app/src/main/resources/application.yml @@ -4,6 +4,20 @@ micronaut: server: host: ${MOCHAT_CALL_SERVICE_HTTP_HOST:0.0.0.0} port: ${MOCHAT_CALL_SERVICE_HTTP_PORT:8090} + cors: + enabled: true + configurations: + web: + allowed-origins: + - '*' + allowed-methods: + - GET + - POST + - PUT + - DELETE + - OPTIONS + allowed-headers: + - '*' mochat: flyway: diff --git a/call-service-app/src/test/java/com/github/lystran/mochat/callservice/CallServiceHttpTest.java b/call-service-app/src/test/java/com/github/lystran/mochat/callservice/CallServiceHttpTest.java index 4101930..cc9bcfc 100644 --- a/call-service-app/src/test/java/com/github/lystran/mochat/callservice/CallServiceHttpTest.java +++ b/call-service-app/src/test/java/com/github/lystran/mochat/callservice/CallServiceHttpTest.java @@ -287,8 +287,9 @@ void callServiceInvitePrivateCallReturnsCallIdAndRoomName() { assertNotNull(result.roomName()); assertEquals(fromUserId, result.fromUserId()); assertEquals(toUserId, result.toUserId()); - // 接收方不在线时 token 为 null,房间会被结束 - // 在线时才会签发 token + assertNotNull(result.token()); + assertNotNull(result.livekitUrl()); + // 接收方不在线时也保留房间并签发发起方 token,等待离线邀请被接收方重连后消费。 } @Test @@ -333,6 +334,21 @@ CallRelationshipService callRelationshipService() { return Mockito.mock(CallRelationshipService.class); } + @Singleton + @Primary + CallTokenService callTokenService() { + CallTokenService mock = Mockito.mock(CallTokenService.class); + when(mock.issueToken(anyLong(), anyString())).thenReturn("test-livekit-token"); + when(mock.livekitUrl()).thenReturn("wss://test-livekit.example"); + return mock; + } + + @Singleton + @Primary + CallOfflineNotificationService callOfflineNotificationService() { + return Mockito.mock(CallOfflineNotificationService.class); + } + @Singleton @Primary RedisCommands redisCommands() { diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/http/GroupsController.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/http/GroupsController.java index 58e7803..a217957 100644 --- a/logic-module/src/main/java/com/github/lystran/mochat/logic/http/GroupsController.java +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/http/GroupsController.java @@ -98,6 +98,27 @@ public HttpResponse kickMember(long groupId, long memberUserId, @QueryValue S } } + /** + * 由群主直接拉好友入群。 + */ + @Post("/{groupId}/members") + public HttpResponse inviteMember(long groupId, @Body InviteGroupMemberRequest request) { + if (request == null) { + return HttpResponse.badRequest(Map.of("message", "request body is required")); + } + var requesterUid = sessionService.resolveUserId(request.sessionId()); + if (requesterUid.isEmpty()) { + return unauthorized(); + } + + try { + GroupsService.GroupMemberMutationSummary summary = groupsService.inviteMember(requesterUid.get(), groupId, request.memberUserId()); + return HttpResponse.ok(new GroupMemberMutationResponse(summary.groupId(), summary.userId(), summary.status())); + } catch (IllegalArgumentException exception) { + return HttpResponse.badRequest(Map.of("message", exception.getMessage())); + } + } + /** * 由群主解散整个群。 */ @@ -221,6 +242,10 @@ public record CreateGroupRequest(String sessionId, String name) { public record JoinGroupRequest(String sessionId, String sign) { } + /** 群主拉好友入群的请求体。 */ + public record InviteGroupMemberRequest(String sessionId, long memberUserId) { + } + /** 处理入群申请的请求体,`action` 只能是 accept 或 reject。 */ public record HandleGroupJoinRequest(String sessionId, String action) { } diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/http/TextMessagesController.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/http/TextMessagesController.java new file mode 100644 index 0000000..aff810b --- /dev/null +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/http/TextMessagesController.java @@ -0,0 +1,330 @@ +package com.github.lystran.mochat.logic.http; + +import com.github.lystran.mochat.common.id.IdGenerator; +import com.github.lystran.mochat.logic.service.ConversationStateService; +import com.github.lystran.mochat.logic.service.SessionService; +import io.micronaut.context.annotation.Requires; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.Post; +import io.micronaut.http.annotation.QueryValue; +import jakarta.inject.Singleton; + +import javax.sql.DataSource; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Desktop-friendly text messaging endpoints used by the local Electron client while + * the full access-gateway/message-service realtime path is not available locally. + */ +@Singleton +@Controller("/messages") +@Requires(beans = DataSource.class) +public final class TextMessagesController { + private static final int DEFAULT_LIMIT = 50; + private static final int MAX_LIMIT = 100; + + private static final String LIST_MESSAGES_SQL = """ + SELECT seq, msg_id, conversation_id, sender_uid, server_ts_ms, payload_base64, message_type + FROM messages + WHERE conversation_id = ? + AND seq > ? + ORDER BY seq ASC + LIMIT ? + """; + private static final String LOCK_CONVERSATION_SQL = """ + SELECT type, latest_seq + FROM conversations + WHERE id = ? + FOR UPDATE + """; + private static final String FIND_PRIVATE_CONVERSATION_SQL = """ + SELECT uid_1, uid_2, status + FROM user_friendships + WHERE id = ? + """; + private static final String INSERT_PRIVATE_MESSAGE_SQL = """ + INSERT INTO messages ( + msg_id, conversation_id, seq, client_msg_id, kind, sender_uid, + peer_uid_low, peer_uid_high, group_id, server_ts_ms, payload_base64, message_type + ) + VALUES (?, ?, ?, ?, 'private', ?, ?, ?, NULL, ?, ?, 'text') + """; + private static final String INSERT_GROUP_MESSAGE_SQL = """ + INSERT INTO messages ( + msg_id, conversation_id, seq, client_msg_id, kind, sender_uid, + peer_uid_low, peer_uid_high, group_id, server_ts_ms, payload_base64, message_type + ) + VALUES (?, ?, ?, ?, 'group', ?, NULL, NULL, ?, ?, ?, 'text') + """; + private static final String UPDATE_CONVERSATION_SQL = """ + UPDATE conversations + SET latest_seq = ?, latest_message_time = ?, updated_at = now() + WHERE id = ? + """; + + private final DataSource dataSource; + private final IdGenerator idGenerator; + private final SessionService sessionService; + private final ConversationStateService conversationStateService; + private final Clock clock = Clock.systemUTC(); + + public TextMessagesController( + DataSource dataSource, + IdGenerator idGenerator, + SessionService sessionService, + ConversationStateService conversationStateService + ) { + this.dataSource = Objects.requireNonNull(dataSource, "dataSource"); + this.idGenerator = Objects.requireNonNull(idGenerator, "idGenerator"); + this.sessionService = Objects.requireNonNull(sessionService, "sessionService"); + this.conversationStateService = Objects.requireNonNull(conversationStateService, "conversationStateService"); + } + + @Get + public HttpResponse list( + @QueryValue String sessionId, + long conversationId, + @Nullable @QueryValue Long afterSeq, + @Nullable @QueryValue Integer limit + ) { + var requesterUid = sessionService.resolveUserId(sessionId); + if (requesterUid.isEmpty()) { + return unauthorized(); + } + if (!conversationStateService.hasConversationAccess(conversationId, requesterUid.get())) { + return HttpResponse.notFound(); + } + + int resolvedLimit = limit == null ? DEFAULT_LIMIT : Math.min(Math.max(limit, 1), MAX_LIMIT); + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(LIST_MESSAGES_SQL)) { + statement.setLong(1, conversationId); + statement.setLong(2, afterSeq == null ? 0L : afterSeq); + statement.setInt(3, resolvedLimit); + return HttpResponse.ok(new MessagesResponse(mapMessages(statement.executeQuery()))); + } catch (SQLException sqlException) { + throw new IllegalStateException("failed to list messages", sqlException); + } + } + + @Post("/send-text/private") + public HttpResponse sendPrivate(@Body SendPrivateTextRequest request) { + if (request == null) { + return HttpResponse.badRequest(Map.of("message", "request body is required")); + } + return sendText(request.sessionId(), request.conversationId(), request.text(), "private"); + } + + @Post("/send-text/group") + public HttpResponse sendGroup(@Body SendGroupTextRequest request) { + if (request == null) { + return HttpResponse.badRequest(Map.of("message", "request body is required")); + } + return sendText(request.sessionId(), request.conversationId(), request.text(), "group"); + } + + private HttpResponse sendText(String sessionId, long conversationId, String text, String expectedKind) { + var requesterUid = sessionService.resolveUserId(sessionId); + if (requesterUid.isEmpty()) { + return unauthorized(); + } + if (text == null || text.trim().isEmpty()) { + return HttpResponse.badRequest(Map.of("message", "text is required")); + } + if (!conversationStateService.hasConversationAccess(conversationId, requesterUid.get())) { + return HttpResponse.notFound(); + } + + try (Connection connection = dataSource.getConnection()) { + connection.setAutoCommit(false); + try { + ConversationLock conversation = lockConversation(connection, conversationId); + if (!expectedKind.equals(conversation.kind())) { + connection.rollback(); + return HttpResponse.badRequest(Map.of("message", "conversation kind mismatch")); + } + + long msgId = idGenerator.nextId(); + long clientMsgId = idGenerator.nextId(); + long seq = conversation.latestSeq() + 1L; + long serverTimeMs = clock.millis(); + String payloadBase64 = Base64.getEncoder().encodeToString(text.trim().getBytes(StandardCharsets.UTF_8)); + + if ("private".equals(expectedKind)) { + PrivateRoute route = findPrivateRoute(connection, conversationId, requesterUid.get()); + if (route == null) { + connection.rollback(); + return HttpResponse.notFound(); + } + insertPrivateMessage(connection, msgId, conversationId, seq, clientMsgId, requesterUid.get(), route, serverTimeMs, payloadBase64); + } else { + insertGroupMessage(connection, msgId, conversationId, seq, clientMsgId, requesterUid.get(), serverTimeMs, payloadBase64); + } + updateConversation(connection, conversationId, seq, serverTimeMs); + connection.commit(); + + return HttpResponse.ok(new SendTextResponse( + new MessageItem(seq, msgId, conversationId, requesterUid.get(), serverTimeMs, text.trim(), "text") + )); + } catch (RuntimeException | SQLException exception) { + connection.rollback(); + throw exception; + } finally { + connection.setAutoCommit(true); + } + } catch (SQLException sqlException) { + throw new IllegalStateException("failed to send text message", sqlException); + } + } + + private ConversationLock lockConversation(Connection connection, long conversationId) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(LOCK_CONVERSATION_SQL)) { + statement.setLong(1, conversationId); + try (ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + throw new IllegalArgumentException("conversation not found"); + } + int type = resultSet.getInt(1); + return new ConversationLock(type == 0 ? "private" : "group", resultSet.getLong(2)); + } + } + } + + private PrivateRoute findPrivateRoute(Connection connection, long conversationId, long requesterUid) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(FIND_PRIVATE_CONVERSATION_SQL)) { + statement.setLong(1, conversationId); + try (ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + return null; + } + long uid1 = resultSet.getLong(1); + long uid2 = resultSet.getLong(2); + if (!"ok".equals(resultSet.getString(3)) || (requesterUid != uid1 && requesterUid != uid2)) { + return null; + } + return new PrivateRoute(uid1, uid2); + } + } + } + + private void insertPrivateMessage( + Connection connection, + long msgId, + long conversationId, + long seq, + long clientMsgId, + long senderUid, + PrivateRoute route, + long serverTimeMs, + String payloadBase64 + ) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(INSERT_PRIVATE_MESSAGE_SQL)) { + statement.setLong(1, msgId); + statement.setLong(2, conversationId); + statement.setLong(3, seq); + statement.setLong(4, clientMsgId); + statement.setLong(5, senderUid); + statement.setLong(6, route.uidLow()); + statement.setLong(7, route.uidHigh()); + statement.setLong(8, serverTimeMs); + statement.setString(9, payloadBase64); + statement.executeUpdate(); + } + } + + private void insertGroupMessage( + Connection connection, + long msgId, + long conversationId, + long seq, + long clientMsgId, + long senderUid, + long serverTimeMs, + String payloadBase64 + ) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(INSERT_GROUP_MESSAGE_SQL)) { + statement.setLong(1, msgId); + statement.setLong(2, conversationId); + statement.setLong(3, seq); + statement.setLong(4, clientMsgId); + statement.setLong(5, senderUid); + statement.setLong(6, conversationId); + statement.setLong(7, serverTimeMs); + statement.setString(8, payloadBase64); + statement.executeUpdate(); + } + } + + private void updateConversation(Connection connection, long conversationId, long seq, long serverTimeMs) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(UPDATE_CONVERSATION_SQL)) { + statement.setLong(1, seq); + statement.setLong(2, serverTimeMs); + statement.setLong(3, conversationId); + statement.executeUpdate(); + } + } + + private static List mapMessages(ResultSet resultSet) throws SQLException { + List messages = new ArrayList<>(); + while (resultSet.next()) { + messages.add(new MessageItem( + resultSet.getLong(1), + resultSet.getLong(2), + resultSet.getLong(3), + resultSet.getLong(4), + resultSet.getLong(5), + decodeText(resultSet.getString(6)), + resultSet.getString(7) + )); + } + return messages; + } + + private static String decodeText(String payloadBase64) { + try { + return new String(Base64.getDecoder().decode(payloadBase64), StandardCharsets.UTF_8); + } catch (IllegalArgumentException exception) { + return ""; + } + } + + private static HttpResponse> unauthorized() { + return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(Map.of("message", "session invalid")); + } + + public record SendPrivateTextRequest(String sessionId, long conversationId, String text) { + } + + public record SendGroupTextRequest(String sessionId, long conversationId, String text) { + } + + public record SendTextResponse(MessageItem message) { + } + + public record MessagesResponse(List items) { + } + + public record MessageItem(long seq, long msgId, long conversationId, long senderUserId, long serverTimeMs, String text, String messageType) { + } + + private record ConversationLock(String kind, long latestSeq) { + } + + private record PrivateRoute(long uidLow, long uidHigh) { + } +} diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/GroupRepository.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/GroupRepository.java index 41e87ec..4d20f0b 100644 --- a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/GroupRepository.java +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/GroupRepository.java @@ -26,6 +26,11 @@ public interface GroupRepository { */ void kickMember(long ownerUserId, long groupId, long memberUserId); + /** + * 由群主直接邀请好友入群。 + */ + void inviteMember(long ownerUserId, long groupId, long memberUserId); + /** * 解散一个群。 */ diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/InMemoryGroupRepository.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/InMemoryGroupRepository.java index c74a96b..fadacb0 100644 --- a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/InMemoryGroupRepository.java +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/InMemoryGroupRepository.java @@ -44,6 +44,14 @@ public void kickMember(long ownerUserId, long groupId, long memberUserId) { throw new UnsupportedOperationException("group membership mutation requires datasource-backed repository"); } + /** + * 内存兜底模式不支持拉人入群。 + */ + @Override + public void inviteMember(long ownerUserId, long groupId, long memberUserId) { + throw new UnsupportedOperationException("group membership mutation requires datasource-backed repository"); + } + /** * 内存兜底模式不支持解散群。 */ diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/JdbcGroupRepository.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/JdbcGroupRepository.java index 55c561f..e2737d7 100644 --- a/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/JdbcGroupRepository.java +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/repository/JdbcGroupRepository.java @@ -85,6 +85,15 @@ SELECT EXISTS ( AND status = 'active' ) """; + private static final String HAS_ACTIVE_FRIENDSHIP_SQL = """ + SELECT EXISTS ( + SELECT 1 + FROM user_friendships + WHERE uid_1 = LEAST(?, ?) + AND uid_2 = GREATEST(?, ?) + AND status = 'ok' + ) + """; private static final String INSERT_JOIN_REQUEST_SQL = """ INSERT INTO group_join_requests (id, group_id, from_uid, sign, status) VALUES (?, ?, ?, ?, 'pending') @@ -270,6 +279,34 @@ public void kickMember(long ownerUserId, long groupId, long memberUserId) { } } + /** + * 由群主直接把自己的好友拉进群。 + */ + @Override + public void inviteMember(long ownerUserId, long groupId, long memberUserId) { + try (Connection connection = dataSource.getConnection()) { + connection.setAutoCommit(false); + try { + requireOwner(connection, ownerUserId, groupId); + if (hasActiveMembership(connection, groupId, memberUserId)) { + throw new IllegalArgumentException("group member already active"); + } + if (!hasActiveFriendship(connection, ownerUserId, memberUserId)) { + throw new IllegalArgumentException("invited user must be an active friend"); + } + upsertMemberMembership(connection, groupId, memberUserId); + connection.commit(); + } catch (RuntimeException | SQLException exception) { + connection.rollback(); + throw exception; + } finally { + connection.setAutoCommit(true); + } + } catch (SQLException sqlException) { + throw new IllegalStateException("failed to invite group member", sqlException); + } + } + /** * 解散一个群,并把当前还活跃的成员关系一并收口。 */ @@ -462,6 +499,22 @@ private boolean hasActiveMembership(Connection connection, long groupId, long us } } + /** + * 判断两人当前是否是好友。 + */ + private boolean hasActiveFriendship(Connection connection, long firstUserId, long secondUserId) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(HAS_ACTIVE_FRIENDSHIP_SQL)) { + statement.setLong(1, firstUserId); + statement.setLong(2, secondUserId); + statement.setLong(3, firstUserId); + statement.setLong(4, secondUserId); + try (ResultSet resultSet = statement.executeQuery()) { + resultSet.next(); + return resultSet.getBoolean(1); + } + } + } + /** * 读取并锁住一条入群申请。 */ diff --git a/logic-module/src/main/java/com/github/lystran/mochat/logic/service/GroupsService.java b/logic-module/src/main/java/com/github/lystran/mochat/logic/service/GroupsService.java index 9e09d6f..8c6fcd7 100644 --- a/logic-module/src/main/java/com/github/lystran/mochat/logic/service/GroupsService.java +++ b/logic-module/src/main/java/com/github/lystran/mochat/logic/service/GroupsService.java @@ -61,6 +61,20 @@ public GroupMemberMutationSummary kickMember(long ownerUserId, long groupId, lon return new GroupMemberMutationSummary(groupId, memberUserId, "kicked"); } + /** + * 由群主把自己的好友拉进群。 + */ + public GroupMemberMutationSummary inviteMember(long ownerUserId, long groupId, long memberUserId) { + requirePositive(ownerUserId, "ownerUserId"); + requirePositive(groupId, "groupId"); + requirePositive(memberUserId, "memberUserId"); + if (ownerUserId == memberUserId) { + throw new IllegalArgumentException("memberUserId must differ from ownerUserId"); + } + groupRepository.inviteMember(ownerUserId, groupId, memberUserId); + return new GroupMemberMutationSummary(groupId, memberUserId, "active"); + } + /** * 解散一个群。 */ diff --git a/service-runtime/build.gradle.kts b/service-runtime/build.gradle.kts index 752ea74..6935b36 100644 --- a/service-runtime/build.gradle.kts +++ b/service-runtime/build.gradle.kts @@ -5,6 +5,8 @@ plugins { dependencies { annotationProcessor("io.micronaut:micronaut-inject-java:4.9.0") implementation("io.micronaut:micronaut-runtime:4.9.0") + implementation("io.micronaut:micronaut-http:4.9.0") + implementation("io.micronaut:micronaut-core-reactive:4.9.0") implementation("jakarta.inject:jakarta.inject-api:2.0.1") testAnnotationProcessor("io.micronaut:micronaut-inject-java:4.9.0") diff --git a/service-runtime/src/main/java/com/github/lystran/mochat/runtime/http/DesktopCorsFilter.java b/service-runtime/src/main/java/com/github/lystran/mochat/runtime/http/DesktopCorsFilter.java new file mode 100644 index 0000000..d254db3 --- /dev/null +++ b/service-runtime/src/main/java/com/github/lystran/mochat/runtime/http/DesktopCorsFilter.java @@ -0,0 +1,35 @@ +package com.github.lystran.mochat.runtime.http; + +import io.micronaut.core.async.publisher.Publishers; +import io.micronaut.http.HttpHeaders; +import io.micronaut.http.HttpMethod; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.annotation.Filter; +import io.micronaut.http.filter.HttpServerFilter; +import io.micronaut.http.filter.ServerFilterChain; +import org.reactivestreams.Publisher; + +/** Allows the Electron desktop renderer to call local service HTTP APIs. */ +@Filter("/**") +public final class DesktopCorsFilter implements HttpServerFilter { + private static final String ALLOWED_METHODS = "GET,POST,PUT,DELETE,OPTIONS"; + private static final String ALLOWED_HEADERS = "Content-Type,Accept,Origin"; + + @Override + public Publisher> doFilter(HttpRequest request, ServerFilterChain chain) { + if (request.getMethod() == HttpMethod.OPTIONS) { + return Publishers.just(withCorsHeaders(HttpResponse.noContent())); + } + return Publishers.map(chain.proceed(request), response -> withCorsHeaders(response)); + } + + private MutableHttpResponse withCorsHeaders(MutableHttpResponse response) { + return response + .header(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .header(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS) + .header(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS) + .header(HttpHeaders.ACCESS_CONTROL_MAX_AGE, "3600"); + } +}