Skip to content
4 changes: 4 additions & 0 deletions api-service-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation(project(":common"))
implementation(project(":infra-redis"))
implementation(project(":logic-module"))
implementation(project(":persistence-module"))
implementation(project(":protocol"))
implementation(project(":multimedia-module"))
implementation("io.lettuce:lettuce-core:6.7.1.RELEASE")
Expand All @@ -16,6 +17,9 @@ dependencies {
implementation("io.micronaut:micronaut-runtime:4.9.0")
implementation("io.micronaut:micronaut-http-server-netty:4.9.0")
implementation("io.micronaut:micronaut-jackson-databind:4.9.0")
implementation("org.flywaydb:flyway-core:10.20.1")
implementation("org.flywaydb:flyway-database-postgresql:10.20.1")
implementation("org.postgresql:postgresql:42.7.5")
runtimeOnly("ch.qos.logback:logback-classic:1.5.18")
runtimeOnly("org.yaml:snakeyaml:2.4")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.lystran.mochat.apiservice.runtime;

import com.github.lystran.mochat.common.event.EventBus;
import com.github.lystran.mochat.common.id.IdGenerator;
import com.github.lystran.mochat.common.offline.OfflineQueue;
import com.github.lystran.mochat.infra.redis.RedisEventBus;
import com.github.lystran.mochat.infra.redis.RedisOfflineQueue;
Expand All @@ -12,13 +13,57 @@
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import org.flywaydb.core.Flyway;
import org.postgresql.ds.PGSimpleDataSource;

import javax.sql.DataSource;

/**
* 按配置创建 `api-service` 运行时要用到的 Redis 相关组件
* 按配置创建 `api-service` 运行时要用到的基础组件
*/
@Factory
public final class ApiServiceRuntimeFactory {
private static final long CUSTOM_EPOCH_MILLIS = 1_704_067_200_000L;
private static final int WORKER_ID_BITS = 10;
private static final int SEQUENCE_BITS = 12;
private static final long MAX_WORKER_ID = (1L << WORKER_ID_BITS) - 1;
private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
private static final int TIMESTAMP_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;
private static final int WORKER_ID_SHIFT = SEQUENCE_BITS;

/**
* 在启用 PostgreSQL 依赖时创建数据源。
*/
@Singleton
@Requires(property = "mochat.api-service.dependencies.postgres-enabled", notEquals = "false", defaultValue = "true")
@Requires(missingBeans = DataSource.class)
DataSource dataSource(
@Property(name = "mochat.postgres.url") String url,
@Property(name = "mochat.postgres.username") String username,
@Property(name = "mochat.postgres.password") String password
) {
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setURL(url);
dataSource.setUser(username);
dataSource.setPassword(password);
return dataSource;
}

/**
* 创建 Flyway,用来管理 api-service 依赖的社交关系读写表结构。
*/
@Singleton
@Requires(bean = DataSource.class)
@Requires(missingBeans = Flyway.class)
Flyway flyway(DataSource dataSource, @Property(name = "mochat.flyway.locations") String flywayLocations) {
return Flyway.configure()
.dataSource(dataSource)
.locations(flywayLocations)
.load();
}

/**
* 在启用 Redis 依赖时创建 Redis 客户端。
*/
Expand Down Expand Up @@ -81,4 +126,70 @@ EventBus eventBus(
OfflineQueue offlineQueue(RedisCommands<String, String> redisCommands) {
return new RedisOfflineQueue(redisCommands);
}

/**
* 创建 api-service 用于用户、好友、群组等业务写入的全局 id 生成器。
*/
@Singleton
@Requires(missingBeans = IdGenerator.class)
IdGenerator idGenerator(@Property(name = "mochat.api-service.id.worker-id", defaultValue = "2") long workerId) {
return new ApiServiceSnowflakeIdGenerator(workerId);
}

private static final class ApiServiceSnowflakeIdGenerator implements IdGenerator {
private final long workerId;
private long lastTimestamp = -1L;
private long sequence;

ApiServiceSnowflakeIdGenerator(long workerId) {
if (workerId < 0 || workerId > MAX_WORKER_ID) {
throw new IllegalArgumentException("workerId must be between 0 and " + MAX_WORKER_ID);
}
this.workerId = workerId;
}

@Override
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
if (timestamp < lastTimestamp) {
throw new IllegalStateException("system clock moved backwards");
}
if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
timestamp = waitForNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - CUSTOM_EPOCH_MILLIS) << TIMESTAMP_SHIFT)
| (workerId << WORKER_ID_SHIFT)
| sequence;
}

private long waitForNextMillis(long previousTimestamp) {
long timestamp = System.currentTimeMillis();
while (timestamp <= previousTimestamp) {
timestamp = System.currentTimeMillis();
}
return timestamp;
}
}
}

@Singleton
@Requires(bean = Flyway.class)
@Requires(property = "mochat.api-service.flyway.migrate-on-start", value = "true", defaultValue = "true")
final class ApiServiceFlywayMigrationBootstrap {
private final Flyway flyway;

ApiServiceFlywayMigrationBootstrap(Flyway flyway) {
this.flyway = flyway;
}

@PostConstruct
void migrate() {
flyway.migrate();
}
}
24 changes: 24 additions & 0 deletions api-service-app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,28 @@ micronaut:
server:
host: ${MOCHAT_API_SERVICE_HTTP_HOST:0.0.0.0}
port: ${MOCHAT_API_SERVICE_HTTP_PORT:8080}
cors:
enabled: true
configurations:
web:
allowed-origins:
- '*'
allowed-methods:
- GET
- POST
- PUT
- DELETE
- OPTIONS
allowed-headers:
- '*'

mochat:
flyway:
locations: classpath:db/migration
postgres:
url: ${MOCHAT_POSTGRES_URL:`jdbc:postgresql://127.0.0.1:15432/mochat`}
username: ${MOCHAT_POSTGRES_USERNAME:mochat}
password: ${MOCHAT_POSTGRES_PASSWORD:mochat}
redis:
uri: '${MOCHAT_REDIS_URI:`redis://127.0.0.1:6379`}'
api-service:
Expand All @@ -14,6 +34,10 @@ mochat:
port: ${MOCHAT_API_SERVICE_HTTP_PORT:8080}
grpc:
port: ${MOCHAT_API_SERVICE_GRPC_PORT:19091}
flyway:
migrate-on-start: ${MOCHAT_API_SERVICE_FLYWAY_MIGRATE_ON_START:true}
id:
worker-id: ${MOCHAT_API_SERVICE_ID_WORKER_ID:2}
dependencies:
redis-enabled: ${MOCHAT_API_SERVICE_REDIS_ENABLED:true}
postgres-enabled: ${MOCHAT_API_SERVICE_POSTGRES_ENABLED:true}
Expand Down
14 changes: 14 additions & 0 deletions api-service-app/src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
micronaut:
application:
name: api-service

mochat:
redis:
uri: redis://127.0.0.1:6379
api-service:
dependencies:
redis-enabled: true
postgres-enabled: false
message-service:
inbound-consumer:
enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,25 @@ public HttpResponse<?> invitePrivateCall(@Body PrivateCallInviteRequest request)
return unauthorized();
}
try {
return HttpResponse.ok(ApiResponse.ok(callService.invitePrivateCall(userId.get(), request.toUserId())));
return HttpResponse.ok(ApiResponse.ok(callService.invitePrivateCall(userId.get(), request.toUserId(), request.callKind())));
} catch (IllegalArgumentException | IllegalStateException exception) {
return HttpResponse.badRequest(ApiResponse.error(exception.getMessage()));
}
}

@Post("/private/signal")
public HttpResponse<?> signalPrivateCall(@Body PrivateCallSignalRequest request) {
var userId = resolveSession(request.sessionId());
if (userId.isEmpty()) {
return unauthorized();
}
try {
return HttpResponse.ok(ApiResponse.ok(callService.forwardPrivateSignal(
userId.get(),
request.toUserId(),
request.type(),
request.roomName()
)));
} catch (IllegalArgumentException | IllegalStateException exception) {
return HttpResponse.badRequest(ApiResponse.error(exception.getMessage()));
}
Expand Down Expand Up @@ -97,7 +115,10 @@ public static <T> ApiResponse<T> error(String message) {
}
}

public record PrivateCallInviteRequest(String sessionId, long toUserId) {
public record PrivateCallInviteRequest(String sessionId, long toUserId, String callKind) {
}

public record PrivateCallSignalRequest(String sessionId, long toUserId, String type, String roomName) {
}

public record GroupCallStartRequest(String sessionId, long groupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public record CallSignalMessage(
long toUserId,
long groupId,
String roomName,
String callKind,
long timestampMillis
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.lystran.mochat.call.dto.CallSignalMessage;

import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
Expand All @@ -17,6 +18,7 @@

/** 群通话离线通知的 MQ 生产者。 */
@Singleton
@Requires(property = "mochat.call-service.dependencies.mq-enabled", notEquals = "false", defaultValue = "true")
public final class CallOfflineNotificationMqProducer {
public static final String DEFAULT_TOPIC = "mochat.call.offline-notifications";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** 处理群通话离线通知的入库和上线补推。 */
@Singleton
Expand All @@ -41,15 +42,15 @@ public CallOfflineNotificationService(
CallRoomManager callRoomManager,
CallRelationshipService relationshipService,
CallSignalGateway signalGateway,
CallOfflineNotificationMqProducer mqProducer
Optional<CallOfflineNotificationMqProducer> mqProducer
) {
this.sqlSessionFactory = Objects.requireNonNull(sqlSessionFactory, "sqlSessionFactory");
this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper");
this.idGenerator = Objects.requireNonNull(idGenerator, "idGenerator");
this.callRoomManager = Objects.requireNonNull(callRoomManager, "callRoomManager");
this.relationshipService = Objects.requireNonNull(relationshipService, "relationshipService");
this.signalGateway = Objects.requireNonNull(signalGateway, "signalGateway");
this.mqProducer = Objects.requireNonNull(mqProducer, "mqProducer");
this.mqProducer = mqProducer.orElse(null);
}

public void enqueue(CallSignalMessage message) {
Expand All @@ -76,7 +77,7 @@ public void enqueueBatch(List<CallSignalMessage> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
if (mqProducer.sendBatch(messages)) {
if (mqProducer != null && mqProducer.sendBatch(messages)) {
return;
}
// MQ 发送失败,同步兜底
Expand Down
Loading