Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
772d281
integrate dpdk loop
stepanrodimanov Mar 26, 2026
abd3dad
Merge branch 'protocols_between_controller_and_workers' into dpdk-int…
stepanrodimanov Mar 31, 2026
5bee3e4
merge traffic_filtering into dpdk-integration
stepanrodimanov Mar 31, 2026
203ae98
interrupt handler
stepanrodimanov Apr 1, 2026
b610905
fix linter
stepanrodimanov Apr 1, 2026
1e55242
Merge branch 'traffic_filtering' into dpdk-integration
stepanrodimanov Apr 2, 2026
2967fb0
fix bazel build
stepanrodimanov Apr 4, 2026
65eca24
Merge branch 'dpdk-integration' of https://github.com/moevm/grpc_serv…
stepanrodimanov Apr 4, 2026
8283bfb
add send to port in
LapshinAE0 Apr 12, 2026
c50b312
Merge changes dpdk
stepanrodimanov Apr 12, 2026
c770dce
integrate changes dpdk
stepanrodimanov Apr 12, 2026
601ae03
Merge branch 'traffic_filtering' into dpdk-integration
stepanrodimanov Apr 12, 2026
ea99a79
fix linter
stepanrodimanov Apr 12, 2026
5017313
full done
LapshinAE0 Apr 13, 2026
4c76460
add stdint
stepanrodimanov Apr 14, 2026
b4ee8e8
merge sqlite
stepanrodimanov Apr 14, 2026
2a8a6fb
fix build
stepanrodimanov Apr 14, 2026
cd0ecf6
fix linter
stepanrodimanov Apr 14, 2026
cac500e
fix build worker
stepanrodimanov Apr 15, 2026
b970eda
fix build worker
stepanrodimanov Apr 15, 2026
74a2853
fix: remove cache
stepanrodimanov Apr 22, 2026
abf0ff9
Merge branch 'traffic_filtering' into dpdk-integration
stepanrodimanov Apr 22, 2026
aa52eb6
feact: added classification request
stepanrodimanov Apr 22, 2026
997f0aa
fix: fix absolute paths
stepanrodimanov Apr 22, 2026
10248e9
fix: fix absolute paths in main.c
stepanrodimanov Apr 22, 2026
da6b922
fix: fix dockerfile
stepanrodimanov Apr 23, 2026
a06f32d
refactor: clang-format
stepanrodimanov Apr 23, 2026
7277099
fix: fix integration test
stepanrodimanov Apr 23, 2026
3cbf326
fix: fix logs
stepanrodimanov Apr 23, 2026
d88e507
fix: fix readme
stepanrodimanov Apr 23, 2026
32dd7c2
fix: fix test
stepanrodimanov Apr 23, 2026
06ae61f
fix: fix linter
stepanrodimanov Apr 23, 2026
115a994
fix: fix scripts
stepanrodimanov Apr 23, 2026
36d0f66
fix: fix imports
stepanrodimanov Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controller/service/internal/models/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (pl *ProviderList) LoadFromFile(filename string) error {
return fmt.Errorf("reading providers: %w", err)
}

interpolatedData := interpolateEnvVars(data)
interpolatedData := interpolateEnvVars(data)

return json.Unmarshal(data, pl)
}
Expand Down
4 changes: 2 additions & 2 deletions controller/service/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Service struct {

func NewService(categoryFile, providerFile string) (*Service, error) {

if err := godotenv.Load(); err != nil {
if err := godotenv.Load(); err != nil {
log.Println("Warning: .env file not found, using system environment variables")
}

Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *Service) Check(checkValue string, endpointName string) ([]int, error) {
continue
}

resp.Body.Close()
resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Printf("Provider %s returned %s", providerName, resp.Status)
Expand Down
2 changes: 1 addition & 1 deletion controller/service/tests/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@ func TestHttpClientRequest(t *testing.T) {
}
})
}
}
}
29 changes: 27 additions & 2 deletions controller/test/BUILD
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
load("@rules_go//go:def.bzl", "go_test")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "test_communication_proto",
srcs = ["communication.proto"],
visibility = ["//visibility:private"],
deps = [
"@protobuf//:struct_proto",
"@protobuf//:empty_proto",
],
)

go_proto_library(
name = "test_communication_go_proto",
compilers = ["@rules_go//proto:go_grpc"],
importpath = "github.com/moevm/grpc_server/controller/test",
proto = ":test_communication_proto",
visibility = ["//visibility:private"],
)

go_test(
name = "integration_test",
srcs = ["worker_test.go"],
deps = [
"//pkg/proto/communication:communication_go_proto",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//test/bufconn",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@com_github_stretchr_testify//assert",
],
args = ["--test.v"],
args = ["-test.v"],
tags = ["exclusive"],
)
)
4 changes: 1 addition & 3 deletions controller/test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
### 1. Сборка компонентов

```bash
cd controller
bazel build //cmd/grpc_server:grpc_server

cd ../worker
bazel build //:worker
```

### 2. Запуск интеграционного теста

```bash
export TEST_CONTROLLER_ADDR="<YOUR_ADDR>" # например localhost:0
cd ../controller
./test/run.sh
```
197 changes: 126 additions & 71 deletions controller/test/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,152 +2,207 @@ package test

import (
"context"
"net"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

pb "github.com/moevm/grpc_server/pkg/proto/communication"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

type MockController struct {
pb.UnimplementedDataServiceServer
Policy *pb.WorkerPolicy
t *testing.T
}

func (m *MockController) GetPolicy(ctx context.Context, req *pb.GetPolicyRequest) (*pb.GetPolicyResponse, error) {
m.t.Logf("GetPolicy called: worker_id=%d, version=%d", req.WorkerId, req.ConfigVersion)
return &pb.GetPolicyResponse{
Result: pb.GetPolicyResponse_POLICY_PROVIDED,
Policy: m.Policy,
}, nil
}

func (m *MockController) Classify(ctx context.Context, req *pb.ClassifyRequest) (*pb.ClassifyResponse, error) {
m.t.Logf("Classify called: worker_id=%d, domain=%s", req.WorkerId, req.Domain)
return &pb.ClassifyResponse{
Categories: []string{"news", "technology"},
TrustLevel: 3,
}, nil
}

func (m *MockController) SendStats(ctx context.Context, req *pb.StatsReport) (*emptypb.Empty, error) {
m.t.Logf("SendStats called: worker_id=%d", req.WorkerId)
return &emptypb.Empty{}, nil
}

func StartMockController(t *testing.T, policy *pb.WorkerPolicy) (string, func()) {
listenAddr := os.Getenv("TEST_CONTROLLER_ADDR")
if listenAddr == "" {
listenAddr = "localhost:0"
}

lis, err := net.Listen("tcp", listenAddr)
if err != nil {
t.Fatalf("Failed to listen on %s: %v", listenAddr, err)
}
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}

s := grpc.NewServer()
mock := &MockController{
Policy: policy,
t: t,
}
pb.RegisterDataServiceServer(s, mock)

go func() {
if err := s.Serve(lis); err != nil {
t.Logf("Server error: %v", err)
}
}()

addr := lis.Addr().String()
cleanup := func() {
s.Stop()
lis.Close()
}
return addr, cleanup
}

func findProjectRoot() string {
dir, _ := os.Getwd()
for {
if _, err := os.Stat(filepath.Join(dir, "controller")); err != nil {
if _, err := os.Stat(filepath.Join(dir, "worker")); err != nil {
dir = filepath.Dir(dir)
continue
}
if _, err := os.Stat(filepath.Join(dir, "controller")); err == nil {
return dir
}
return dir
if _, err := os.Stat(filepath.Join(dir, "worker")); err == nil {
return dir
}
parent := filepath.Dir(dir)
if parent == dir {
return dir
}
dir = parent
}
}

func TestWorkerPolicyRequest(t *testing.T) {
func TestWorkerPolicyContent(t *testing.T) {
root := findProjectRoot()

ctrlBin := filepath.Join(root, "controller", "bazel-bin", "cmd", "grpc_server", "grpc_server_", "grpc_server")
workerBin := filepath.Join(root, "worker", "bazel-bin", "worker")

if _, err := os.Stat(ctrlBin); err != nil {
t.Skipf("Controller binary not found: %v", err)
}
if _, err := os.Stat(workerBin); err != nil {
t.Skipf("Worker binary not found: %v", err)
}

ctrl := exec.Command(ctrlBin)

if err := ctrl.Start(); err != nil {
t.Fatalf("Failed to start controller: %v", err)
testPolicy := &pb.WorkerPolicy{
ConfigVersion: 2,
MinTrustLevel: 2,
BlockCategories: []string{"CATEGORY_ONLINE_SHOPS", "CATEGORY_ANONYMIZERS", "CATEGORY_ALCOHOL"},
BlockDomains: []string{"1xbet.com"},
AllowDomains: []string{"github.com", "vk.com"},
BlockByTrust: map[string]int32{
"CATEGORY_MALWARE": 3,
"CATEGORY_BETTING": 4,
},
}

defer func() {
if err := ctrl.Process.Kill(); err != nil {
t.Logf("Warning: failed to kill controller: %v", err)
}
}()

time.Sleep(1 * time.Second)
addr, cleanup := StartMockController(t, testPolicy)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

worker := exec.CommandContext(ctx, workerBin)
worker.Env = []string{
"WORKER_ID=1",
"CONTROLLER_GRPC_ADDR=" + addr,
"METRICS_GATEWAY_ADDRESS=localhost",
"METRICS_GATEWAY_PORT=9091",
"TEST_REQUEST_POLICY=true",
}

output, err := worker.CombinedOutput()
assert.NoError(t, err, "Worker failed")
assert.Contains(t, string(output), "Worker 1 requests policy")
assert.Contains(t, string(output), "Policy received")
assert.NoError(t, err, "Worker failed: %s", string(output))

outputStr := string(output)

assert.Contains(t, outputStr, "Policy received")
assert.Contains(t, outputStr, "Min trust level: 2")
assert.Contains(t, outputStr, "Config version: 2")

assert.Contains(t, outputStr, "blocked_categories: CATEGORY_ONLINE_SHOPS")
assert.Contains(t, outputStr, "blocked_categories: CATEGORY_ANONYMIZERS")
assert.Contains(t, outputStr, "blocked_categories: CATEGORY_ALCOHOL")
assert.Contains(t, outputStr, "block_domains: 1xbet.com")
assert.Contains(t, outputStr, "allow_domains: github.com")
assert.Contains(t, outputStr, "allow_domains: vk.com")
}

func TestWorkerStatsReport(t *testing.T) {
func TestWorkerClassify(t *testing.T) {
root := findProjectRoot()

ctrlBin := filepath.Join(root, "controller", "bazel-bin", "cmd", "grpc_server", "grpc_server_", "grpc_server")
workerBin := filepath.Join(root, "worker", "bazel-bin", "worker")

if _, err := os.Stat(ctrlBin); err != nil {
t.Skipf("Controller binary not found: %v", err)
}
if _, err := os.Stat(workerBin); err != nil {
t.Skipf("Worker binary not found: %v", err)
}

ctrl := exec.Command(ctrlBin)

if err := ctrl.Start(); err != nil {
t.Fatalf("Failed to start controller: %v", err)
}

defer func() {
if err := ctrl.Process.Kill(); err != nil {
t.Logf("Warning: failed to kill controller: %v", err)
}
}()

time.Sleep(1 * time.Second)
addr, cleanup := StartMockController(t, nil)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

worker := exec.CommandContext(ctx, workerBin)
worker.Env = []string{
"WORKER_ID=1",
"CONTROLLER_GRPC_ADDR=" + addr,
"METRICS_GATEWAY_ADDRESS=localhost",
"METRICS_GATEWAY_PORT=9091",
"TEST_STATS=true",
"TEST_CLASSIFY_DOMAIN=example.com",
}

output, err := worker.CombinedOutput()
assert.NoError(t, err, "Worker failed")
assert.Contains(t, string(output), "Worker 1 send stats")
assert.Contains(t, string(output), "Policy received")
outputStr := string(output)
assert.NoError(t, err, "Worker failed: %s", string(output))
assert.Contains(t, outputStr, "Domain 'example.com' classified as categories [news, technology] with trust level 3")
}

func TestWorkerClassifyRequest(t *testing.T) {
func TestWorkerSendStats(t *testing.T) {
root := findProjectRoot()

ctrlBin := filepath.Join(root, "controller", "bazel-bin", "cmd", "grpc_server", "grpc_server_", "grpc_server")
workerBin := filepath.Join(root, "worker", "bazel-bin", "worker")

if _, err := os.Stat(ctrlBin); err != nil {
t.Skipf("Controller binary not found: %v", err)
}
if _, err := os.Stat(workerBin); err != nil {
t.Skipf("Worker binary not found: %v", err)
}

ctrl := exec.Command(ctrlBin)

if err := ctrl.Start(); err != nil {
t.Fatalf("Failed to start controller: %v", err)
}

defer func() {
if err := ctrl.Process.Kill(); err != nil {
t.Logf("Warning: failed to kill controller: %v", err)
}
}()

time.Sleep(1 * time.Second)
addr, cleanup := StartMockController(t, nil)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

worker := exec.CommandContext(ctx, workerBin)
worker.Env = []string{
"WORKER_ID=1",
"CONTROLLER_GRPC_ADDR=" + addr,
"METRICS_GATEWAY_ADDRESS=localhost",
"METRICS_GATEWAY_PORT=9091",
"TEST_CLASSIFY_DOMAIN=facebook.com",
"TEST_STATS=true",
}

output, err := worker.CombinedOutput()
assert.NoError(t, err, "Worker failed")
assert.Contains(t, string(output), "Domain 'facebook.com' classified as category")
assert.NoError(t, err, "Worker failed: %s", string(output))

outputStr := string(output)

assert.Contains(t, outputStr, "Stats sent successfully")

}
Loading
Loading