diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
index a17e6265..4c34f7c2 100644
--- a/.idea/codeStyles/Project.xml
+++ b/.idea/codeStyles/Project.xml
@@ -22,7 +22,6 @@
-
diff --git a/cli/Makefile b/cli/Makefile
index 1b938225..62980130 100644
--- a/cli/Makefile
+++ b/cli/Makefile
@@ -1,17 +1,31 @@
-TARGET := dphp-linux-*
-BIN_PATH := ../bin
-DOCKER_IMAGE := builder
-DOCKER_TARGET := cli-base-alpine
-BUILD_PATH := /go/src/app/cli/dist
+DEBUG ?= 0
-${BIN_PATH}/${TARGET}: cli.go */* go.mod build.sh build-php.sh ../Dockerfile
- mkdir -p ${BIN_PATH}
- cd .. && docker buildx build --pull --load --target ${DOCKER_TARGET} -t ${DOCKER_IMAGE} .
- docker create --name ${DOCKER_IMAGE} ${DOCKER_IMAGE} || ( docker rm -f ${DOCKER_IMAGE} && false )
- docker cp ${DOCKER_IMAGE}:${BUILD_PATH}/dphp ${BIN_PATH}/ || ( docker rm -f ${DOCKER_IMAGE} && false )
- docker rm -f ${DOCKER_IMAGE}
- upx -9 --force-pie ../bin/dphp-*
+PHP_INCLUDES := $(shell php-config --includes)
+PHP_LDFLAGS := $(shell php-config --ldflags)
+PHP_LIBS := $(shell php-config --libs)
-../dist: ${BIN_PATH}/${TARGET}
- docker create --name builder builder
- docker cp ${DOCKER_IMAGE}:${BUILD_PATH} ../dist
+ifeq ($(DEBUG),1)
+ XCADDY_FLAGS := -gcflags='all=-N -l' -tags=nobadger,nomysql,nopgx,nodphp,nobrotli
+else
+ XCADDY_FLAGS := -ldflags='-w -s' -tags=nobadger,nomysql,nopgx,nodphp,nobrotli
+endif
+
+LOCAL_MODULE := /home/withinboredom/code/durable-php/cli
+GEN_STUB := /home/withinboredom/code/php-src/build/gen_stub.php
+LOCAL_FRANKENPHP_CADDY := /home/withinboredom/code/frankenphp/caddy
+LOCAL_FRANKENPHP := /home/withinboredom/code/frankenphp
+
+# Targets
+frankenphp: ext/build/ext.go ext/build/ext_arginfo.h
+ CGO_ENABLED=1 \
+ XCADDY_GO_BUILD_FLAGS="$(XCADDY_FLAGS)" \
+ CGO_CFLAGS="$(PHP_INCLUDES)" \
+ CGO_LDFLAGS="$(PHP_LDFLAGS) $(PHP_LIBS)" \
+ xcaddy build \
+ --output frankenphp \
+ --with github.com/dunglas/frankenphp/caddy=$(LOCAL_FRANKENPHP_CADDY) \
+ --with github.com/dunglas/frankenphp=$(LOCAL_FRANKENPHP) \
+ --with github.com/bottledcode/durable-php/cli=$(LOCAL_MODULE)
+
+ext/build/ext_arginfo.h: ext/build/ext.stub.php
+ $(GEN_STUB) ext/build/ext.stub.php ext/build
\ No newline at end of file
diff --git a/cli/auth/keys.go b/cli/auth/keys.go
index 83445308..ff00e33e 100644
--- a/cli/auth/keys.go
+++ b/cli/auth/keys.go
@@ -2,11 +2,11 @@ package auth
import (
"context"
- "durable_php/appcontext"
- "durable_php/config"
"encoding/base64"
"errors"
"fmt"
+ "github.com/bottledcode/durable-php/cli/appcontext"
+ "github.com/bottledcode/durable-php/cli/config"
"github.com/golang-jwt/jwt/v4"
"net/http"
"strings"
diff --git a/cli/auth/keys_test.go b/cli/auth/keys_test.go
index abb281de..14658bf4 100644
--- a/cli/auth/keys_test.go
+++ b/cli/auth/keys_test.go
@@ -2,8 +2,8 @@ package auth
import (
"context"
- "durable_php/appcontext"
- "durable_php/config"
+ "github.com/bottledcode/durable-php/cli/appcontext"
+ "github.com/bottledcode/durable-php/cli/config"
"testing"
)
diff --git a/cli/auth/resource.go b/cli/auth/resource.go
index be0b7192..4bec8b3c 100644
--- a/cli/auth/resource.go
+++ b/cli/auth/resource.go
@@ -2,21 +2,28 @@ package auth
import (
"context"
- "durable_php/appcontext"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
"errors"
"fmt"
- "github.com/nats-io/nats.go/jetstream"
- "go.uber.org/zap"
"net/http"
"os"
"slices"
"sync"
"time"
+
+ "github.com/bottledcode/durable-php/cli/appcontext"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
+ "github.com/nats-io/nats.go/jetstream"
+ "go.uber.org/zap"
)
+// LocalMessageSender is a function type for sending local messages
+type LocalMessageSender func(method, stateId string, context map[string]interface{}) (interface{}, error)
+
+// SendLocalMessage is a global function pointer that gets set by the build package
+var SendLocalMessage LocalMessageSender
+
// Resource represents a resource with owners, shares, mode, expiration, and revision.
// It allows operations such as updating, sharing ownership, applying permissions,
// checking permissions, and serializing/deserializing to/from bytes.
@@ -139,19 +146,41 @@ func (r *Resource) getOrCreatePermissions(id *ids.StateId, ctx context.Context,
if cached, found := cache.Load(id.Name()); found {
perms = cached.(CreatePermissions)
} else {
- result, err := os.CreateTemp("", "")
- if err != nil {
- return perms, err
- }
- defer os.Remove(result.Name())
- result.Close()
-
- glu := glue.NewGlue(ctx.Value("bootstrap").(string), glue.GetPermissions, make([]any, 0), result.Name())
- env := map[string]string{"STATE_ID": id.String()}
- _, headers, _, _ := glu.Execute(ctx, make(http.Header), logger, env, nil, id, ids.SystemSource)
- data := headers.Get("Permissions")
- if err = json.Unmarshal([]byte(data), &perms); err != nil {
- return perms, err
+ if SendLocalMessage != nil {
+ // Use the new local message system
+ contextData := map[string]interface{}{
+ "bootstrap": ctx.Value("bootstrap"),
+ }
+
+ response, err := SendLocalMessage("getPermissions", id.String(), contextData)
+ if err != nil {
+ return perms, err
+ }
+
+ // The response should be JSON string
+ if responseStr, ok := response.(string); ok {
+ if err = json.Unmarshal([]byte(responseStr), &perms); err != nil {
+ return perms, err
+ }
+ } else {
+ return perms, errors.New("invalid response format from local message")
+ }
+ } else {
+ // Fallback to old glue system
+ result, err := os.CreateTemp("", "")
+ if err != nil {
+ return perms, err
+ }
+ defer os.Remove(result.Name())
+ result.Close()
+
+ glu := glue.NewGlue(ctx.Value("bootstrap").(string), glue.GetPermissions, make([]any, 0), result.Name())
+ env := map[string]string{"STATE_ID": id.String()}
+ _, headers, _, _ := glu.Execute(ctx, make(http.Header), logger, env, nil, id, ids.SystemSource)
+ data := headers.Get("Permissions")
+ if err = json.Unmarshal([]byte(data), &perms); err != nil {
+ return perms, err
+ }
}
cache.Store(id.Name(), perms)
}
diff --git a/cli/auth/resourceManager.go b/cli/auth/resourceManager.go
index 27aadab5..3c2a31b2 100644
--- a/cli/auth/resourceManager.go
+++ b/cli/auth/resourceManager.go
@@ -2,10 +2,10 @@ package auth
import (
"context"
- "durable_php/appcontext"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
+ "github.com/bottledcode/durable-php/cli/appcontext"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
"github.com/modern-go/concurrent"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
diff --git a/cli/auth/resource_test.go b/cli/auth/resource_test.go
index bf0b85b1..83869c51 100644
--- a/cli/auth/resource_test.go
+++ b/cli/auth/resource_test.go
@@ -2,8 +2,8 @@ package auth
import (
"context"
- "durable_php/appcontext"
"errors"
+ "github.com/bottledcode/durable-php/cli/appcontext"
"github.com/stretchr/testify/assert"
"testing"
"time"
diff --git a/cli/auth/user.go b/cli/auth/user.go
index e2e5913f..0951ae93 100644
--- a/cli/auth/user.go
+++ b/cli/auth/user.go
@@ -2,7 +2,7 @@ package auth
import (
"context"
- "durable_php/appcontext"
+ "github.com/bottledcode/durable-php/cli/appcontext"
"slices"
)
diff --git a/cli/cli.go b/cli/cli.go
index 53017760..157fd77c 100644
--- a/cli/cli.go
+++ b/cli/cli.go
@@ -1,3 +1,5 @@
+//go:build !nodphp
+
/*
* Copyright ©2024 Robert Landers
*
@@ -24,14 +26,14 @@ package main
import (
"context"
- "durable_php/auth"
- "durable_php/config"
- "durable_php/glue"
- "durable_php/ids"
- di "durable_php/init"
- "durable_php/lib"
"encoding/json"
"fmt"
+ "github.com/bottledcode/durable-php/cli/auth"
+ "github.com/bottledcode/durable-php/cli/config"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
+ di "github.com/bottledcode/durable-php/cli/init"
+ "github.com/bottledcode/durable-php/cli/lib"
"github.com/dunglas/frankenphp"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats-server/v2/test"
diff --git a/cli/export.go b/cli/export.go
new file mode 100644
index 00000000..2cd3d702
--- /dev/null
+++ b/cli/export.go
@@ -0,0 +1,5 @@
+//go:build nodphp
+
+package cli
+
+import _ "github.com/bottledcode/durable-php/cli/ext/build"
diff --git a/cli/ext/build/README.md b/cli/ext/build/README.md
new file mode 100644
index 00000000..93804e1b
--- /dev/null
+++ b/cli/ext/build/README.md
@@ -0,0 +1,37 @@
+# ext Extension
+
+Auto-generated PHP extension from Go code.
+
+## Functions
+
+### emit_event
+
+```php
+emit_event(?array $userContext, array $event, string $from): int
+```
+
+**Parameters:**
+
+- `userContext` (array) (nullable)
+- `event` (array)
+- `from` (string)
+
+**Returns:** int
+
+## Classes
+
+### Worker
+
+**Properties:**
+
+- `kind`: mixed
+- `started`: bool
+- `consumer`: mixed (nullable)
+- `activeId`: mixed (nullable)
+- `state`: mixed (nullable)
+- `pendingEvents`: array
+- `authContext`: array
+- `currentCtx`: mixed
+- `currentMsg`: mixed
+
+
diff --git a/cli/ext/build/ext.c b/cli/ext/build/ext.c
new file mode 100644
index 00000000..0d7957a0
--- /dev/null
+++ b/cli/ext/build/ext.c
@@ -0,0 +1,360 @@
+#include
+#include
+#include
+#include
+#include
+
+#include "ext.h"
+#include "ext_arginfo.h"
+#include "_cgo_export.h"
+
+#define VALIDATE_GO_HANDLE(intern) \
+ do { \
+ if ((intern)->go_handle == 0) { \
+ zend_throw_error(NULL, "Go object not found in registry"); \
+ RETURN_THROWS(); \
+ } \
+ } while (0)
+
+static zend_object_handlers object_handlers_ext;
+
+typedef struct {
+ uintptr_t go_handle;
+ zend_object std; /* This must be the last field in the structure: the property store starts at this offset */
+} ext_object;
+
+static inline ext_object *ext_object_from_obj(zend_object *obj) {
+ return (ext_object*)((char*)(obj) - offsetof(ext_object, std));
+}
+
+static zend_object *ext_create_object(zend_class_entry *ce) {
+ fprintf(stderr, "[C DEBUG] ext_create_object called!\n");
+ fflush(stderr);
+
+ /* Always allocate without properties due to corrupted class entry */
+ ext_object *intern = ecalloc(1, sizeof(ext_object));
+
+ fprintf(stderr, "[C DEBUG] About to call zend_object_std_init\n");
+ fflush(stderr);
+
+ zend_object_std_init(&intern->std, ce);
+
+ fprintf(stderr, "[C DEBUG] zend_object_std_init completed\n");
+ fflush(stderr);
+
+ /* Skip object_properties_init due to corrupted class entry */
+
+ intern->std.handlers = &object_handlers_ext;
+ intern->go_handle = 0; /* will be set in __construct */
+
+ fprintf(stderr, "[C DEBUG] ext_create_object returning\n");
+ fflush(stderr);
+
+ return &intern->std;
+}
+
+static void ext_free_object(zend_object *object) {
+ ext_object *intern = ext_object_from_obj(object);
+
+ if (intern->go_handle != 0) {
+ removeGoObject(intern->go_handle);
+ }
+
+ zend_object_std_dtor(&intern->std);
+}
+
+void init_object_handlers() {
+ memcpy(&object_handlers_ext, &std_object_handlers, sizeof(zend_object_handlers));
+ object_handlers_ext.free_obj = ext_free_object;
+ object_handlers_ext.clone_obj = NULL;
+ object_handlers_ext.offset = offsetof(ext_object, std);
+}
+
+static zend_class_entry *Worker_ce = NULL;
+static __thread uintptr_t current_go_worker_handle = 0; /* Thread-local Go worker handle */
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, __construct) {
+ fprintf(stderr, "[C DEBUG] Constructor called\n");
+ fflush(stderr);
+
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ fprintf(stderr, "[C DEBUG] Constructor parameters parsed\n");
+ fflush(stderr);
+
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ fprintf(stderr, "[C DEBUG] Got intern object, go_handle=%lu\n", intern->go_handle);
+ fflush(stderr);
+
+ /* Constructor is called more than once, make it no-op */
+ if (intern->go_handle != 0) {
+ fprintf(stderr, "[C DEBUG] Constructor already called, returning\n");
+ fflush(stderr);
+ return;
+ }
+
+ /* Use the thread-local Go worker handle if available */
+ if (current_go_worker_handle != 0) {
+ fprintf(stderr, "[C DEBUG] Constructor using thread-local handle %lu\n", current_go_worker_handle);
+ fflush(stderr);
+ intern->go_handle = current_go_worker_handle;
+ } else {
+ /* Fallback: create a new Worker object */
+ fprintf(stderr, "[C DEBUG] Constructor creating new Worker object\n");
+ fflush(stderr);
+ intern->go_handle = create_Worker_object();
+ }
+
+ fprintf(stderr, "[C DEBUG] Constructor completed\n");
+ fflush(stderr);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, __destruct) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ __destruct_wrapper(intern->go_handle);
+}
+
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, queryState) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ zend_string *stateId = NULL;
+
+ ZEND_PARSE_PARAMETERS_START(1, 1)
+ Z_PARAM_STR(stateId)
+ ZEND_PARSE_PARAMETERS_END();
+
+ void* result = queryState_wrapper(intern->go_handle, stateId);
+ if (result != NULL) {
+ HashTable *ht = (HashTable*)result;
+ RETURN_ARR(ht);
+ } else {
+ RETURN_NULL();
+ }
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getUser) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ void* result = getUser_wrapper(intern->go_handle);
+ if (result != NULL) {
+ HashTable *ht = (HashTable*)result;
+ RETURN_ARR(ht);
+ } else {
+ RETURN_NULL();
+ }
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, setUser) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ zval *ht;
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_START(1, 1)
+ Z_PARAM_ARRAY_OR_NULL(ht)
+ ZEND_PARSE_PARAMETERS_END();
+
+ setUser_wrapper(intern->go_handle, ht);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getSource) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ zend_string* result = getSource_wrapper(intern->go_handle);
+ RETURN_STR(result);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getCurrentId) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ zend_string* result = getCurrentId_wrapper(intern->go_handle);
+ RETURN_STR(result);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getCorrelationId) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ zend_string* result = getCorrelationId_wrapper(intern->go_handle);
+ RETURN_STR(result);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getState) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ void* result = getState_wrapper(intern->go_handle);
+ if (result != NULL) {
+ HashTable *ht = (HashTable*)result;
+ RETURN_ARR(ht);
+ } else {
+ RETURN_NULL();
+ }
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, updateState) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ zval *state = NULL;
+
+ ZEND_PARSE_PARAMETERS_START(1, 1)
+ Z_PARAM_ARRAY(state)
+ ZEND_PARSE_PARAMETERS_END();
+
+ updateState_wrapper(intern->go_handle, state);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, emitEvent) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ zval *eventDescription = NULL;
+
+ ZEND_PARSE_PARAMETERS_START(1, 1)
+ Z_PARAM_ARRAY(eventDescription)
+ ZEND_PARSE_PARAMETERS_END();
+
+ emitEvent_wrapper(intern->go_handle, eventDescription);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, delete) {
+ ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
+
+ VALIDATE_GO_HANDLE(intern);
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ delete_wrapper(intern->go_handle);
+}
+
+PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, GetCurrent) {
+ fprintf(stderr, "[C DEBUG] GetCurrent method entry\n");
+ fflush(stderr);
+
+ ZEND_PARSE_PARAMETERS_NONE();
+
+ fprintf(stderr, "[C DEBUG] GetCurrent called, thread-local handle=%lu\n", current_go_worker_handle);
+ fflush(stderr);
+
+ /* If no thread-local worker handle, return NULL */
+ if (current_go_worker_handle == 0) {
+ fprintf(stderr, "[C DEBUG] No thread-local handle, returning NULL\n");
+ fflush(stderr);
+ RETURN_NULL();
+ }
+
+ fprintf(stderr, "[C DEBUG] About to create Worker object with object_init_ex\n");
+ fflush(stderr);
+
+ /* Create a new Worker object - PHP constructor will pick up the thread-local handle */
+ zval obj;
+ if (object_init_ex(&obj, Worker_ce) != SUCCESS) {
+ fprintf(stderr, "[C DEBUG] object_init_ex failed\n");
+ fflush(stderr);
+ RETURN_NULL();
+ }
+
+ fprintf(stderr, "[C DEBUG] object_init_ex succeeded, returning object\n");
+ fflush(stderr);
+
+ RETURN_ZVAL(&obj, 0, 0);
+}
+
+void register_all_classes() {
+ init_object_handlers();
+ Worker_ce = register_class_Bottledcode_DurablePhp_Ext_Worker();
+ if (!Worker_ce) {
+ php_error_docref(NULL, E_ERROR, "Failed to register class Worker");
+ return;
+ }
+ Worker_ce->create_object = ext_create_object;
+}
+
+/* Function to set the current worker from Go */
+void set_current_worker_handle(uintptr_t handle) {
+ fprintf(stderr, "[C DEBUG] Setting thread-local worker handle to %lu\n", handle);
+ fflush(stderr);
+ current_go_worker_handle = handle;
+}
+
+/* Function to get current worker handle */
+uintptr_t get_current_worker_handle() {
+ return current_go_worker_handle;
+}
+
+/* Function to clear current worker */
+void clear_current_worker() {
+ fprintf(stderr, "[C DEBUG] Clearing thread-local worker handle\n");
+ fflush(stderr);
+ current_go_worker_handle = 0;
+}
+
+PHP_MINIT_FUNCTION(ext) {
+ fprintf(stderr, "[C DEBUG] MINIT starting\n");
+ fflush(stderr);
+
+ register_all_classes();
+ fprintf(stderr, "[C DEBUG] Classes registered\n");
+ fflush(stderr);
+
+ go_init_module();
+ fprintf(stderr, "[C DEBUG] Go module initialized\n");
+ fflush(stderr);
+
+ return SUCCESS;
+}
+
+
+
+PHP_MSHUTDOWN_FUNCTION(ext) {
+ go_shutdown_module();
+ return SUCCESS;
+}
+
+
+
+zend_module_entry ext_module_entry = {STANDARD_MODULE_HEADER,
+ "ext",
+ ext_functions, /* Functions */
+ PHP_MINIT(ext), /* MINIT */
+ PHP_MSHUTDOWN(ext), /* MSHUTDOWN */
+ NULL, /* RINIT */
+ NULL, /* RSHUTDOWN */
+ NULL, /* MINFO */
+ "1.0.0", /* Version */
+ STANDARD_MODULE_PROPERTIES};
+
+PHP_FUNCTION(Bottledcode_DurablePhp_Ext_emit_event)
+{
+ zval *userContext = NULL;
+ zval *event = NULL;
+ zend_string *from = NULL;
+ ZEND_PARSE_PARAMETERS_START(3, 3)
+ Z_PARAM_ARRAY_OR_NULL(userContext)
+ Z_PARAM_ARRAY(event)
+ Z_PARAM_STR(from)
+ ZEND_PARSE_PARAMETERS_END();
+ long result = emit_event(userContext, event, from);
+ RETURN_LONG(result);
+}
+
diff --git a/cli/ext/build/ext.go b/cli/ext/build/ext.go
new file mode 100644
index 00000000..c5fe6d8f
--- /dev/null
+++ b/cli/ext/build/ext.go
@@ -0,0 +1,969 @@
+package build
+
+/*
+#include
+#include "ext.h"
+
+void set_current_worker_handle(uintptr_t handle);
+void clear_current_worker(void);
+*/
+import "C"
+import (
+ "runtime/cgo"
+)
+import "unsafe"
+import "github.com/dunglas/frankenphp"
+import "context"
+import "encoding/json"
+import "errors"
+import "net/http"
+import "os"
+import "strings"
+import "sync"
+import "time"
+import "github.com/bottledcode/durable-php/cli/appcontext"
+import "github.com/bottledcode/durable-php/cli/auth"
+import "github.com/bottledcode/durable-php/cli/config"
+import "github.com/bottledcode/durable-php/cli/ext/helpers"
+import "github.com/bottledcode/durable-php/cli/glue"
+import "github.com/bottledcode/durable-php/cli/ids"
+import "github.com/bottledcode/durable-php/cli/lib"
+import "github.com/nats-io/nats-server/v2/server"
+import "github.com/nats-io/nats-server/v2/test"
+import "github.com/nats-io/nats.go"
+import "github.com/nats-io/nats.go/jetstream"
+import "go.uber.org/zap"
+
+// LocalMessage represents a local synchronous request
+type LocalMessage struct {
+ Method string // "getPermissions", etc.
+ StateId string // The state ID for the request
+ Context map[string]interface{} // Additional context data
+ ResponseCh chan interface{} // Channel to send response back
+}
+
+type frankenphpWorker struct {
+ requestChan chan jetstream.Msg // Channel for NATS messages
+ localMessageChan chan *LocalMessage // Channel for local synchronous requests
+ running bool
+}
+
+var globalWorkerInstance *frankenphpWorker
+
+func (w *frankenphpWorker) Name() string {
+ return "m#durable-php"
+}
+
+func (w *frankenphpWorker) FileName() string {
+ // check if target exists
+ if _, err := os.Stat("src/Glue/frankenphpWorker.php"); !os.IsNotExist(err) {
+ return "src/Glue/frankenphpWorker.php"
+ }
+
+ return "vendor/bottledcode/durable-php/src/Glue/frankenphpWorker.php"
+}
+
+func (w *frankenphpWorker) Env() frankenphp.PreparedEnv {
+ return frankenphp.PreparedEnv{}
+}
+
+func (w *frankenphpWorker) GetMinThreads() int {
+ return 4
+}
+
+func (w *frankenphpWorker) ThreadActivatedNotification(threadId int) {
+}
+
+func (w *frankenphpWorker) ThreadDrainNotification(threadId int) {
+}
+
+func (w *frankenphpWorker) ThreadDeactivatedNotification(threadId int) {
+}
+
+func (w *frankenphpWorker) ProvideRequest() *frankenphp.WorkerRequest {
+ // Select between NATS messages and local messages
+ select {
+ case msg := <-w.requestChan:
+ // Process NATS message
+ return processMessage(msg)
+ case localMsg := <-w.localMessageChan:
+ // Process local message
+ return processLocalMessage(localMsg)
+ }
+}
+
+func processMessage(msg jetstream.Msg) *frankenphp.WorkerRequest {
+ ctx := helpers.Ctx
+ logger := helpers.Logger
+ js := helpers.Js
+
+ // Process the message using the logic from getNextEvent
+ meta, _ := msg.Metadata()
+ headers := msg.Headers()
+
+ currentUser := &auth.User{}
+ b := msg.Headers().Get(string(glue.HeaderProvenance))
+ err := json.Unmarshal([]byte(b), currentUser)
+ if err != nil {
+ logger.Warn("Failed to unmarshal event provenance",
+ zap.Any("Provenance", msg.Headers().Get(string(glue.HeaderProvenance))),
+ zap.Error(err),
+ )
+ currentUser = nil
+ } else {
+ ctx = auth.DecorateContextWithUser(ctx, currentUser)
+ }
+
+ // Handle delayed messages
+ if headers.Get(string(glue.HeaderDelay)) != "" && meta.NumDelivered == 1 {
+ logger.Debug("Delaying message", zap.String("delay", msg.Headers().Get("Delay")), zap.Any("Headers", meta))
+ schedule, err := time.Parse(time.RFC3339, msg.Headers().Get("Delay"))
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+
+ delay := time.Until(schedule)
+ if err := msg.NakWithDelay(delay); err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+
+ // Recursively handle delayed message by putting it back in channel
+ globalWorkerInstance.requestChan <- msg
+ return nil
+ }
+
+ // Handle delete messages
+ if strings.HasSuffix(msg.Subject(), ".delete") {
+ id := ids.ParseStateId(msg.Headers().Get(string(glue.HeaderStateId)))
+ err := glue.DeleteState(ctx, js, logger, id)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+ // Return nil to get next message
+ return nil
+ }
+
+ // Determine the kind from the message subject
+ var kind ids.IdKind
+ if strings.Contains(msg.Subject(), ".activity.") {
+ kind = ids.Activity
+ } else if strings.Contains(msg.Subject(), ".entity.") {
+ kind = ids.Entity
+ } else if strings.Contains(msg.Subject(), ".orchestration.") {
+ kind = ids.Orchestration
+ }
+
+ // Create frankenphpWorker context for this message
+ worker := &Worker{
+ kind: kind,
+ currentMsg: msg,
+ }
+
+ worker.currentCtx = lib.GetCorrelationId(ctx, nil, &headers)
+
+ // Set the current frankenphpWorker for PHP access BEFORE processing/authorization
+ helpers.Logger.Info("About to call SetCurrentWorker")
+ SetCurrentWorker(worker)
+ helpers.Logger.Info("SetCurrentWorker completed")
+
+ rm := auth.GetResourceManager(ctx, js)
+
+ worker.authContext, worker.activeId, worker.state, err = lib.ProcessMessage(ctx, logger, msg, rm, helpers.Config, js)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+
+ // Create an HTTP request from the message
+ httpReq, err := http.NewRequest("POST", "/worker", strings.NewReader(string(msg.Data())))
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+
+ httpReq.Header.Set("Content-Type", "application/json")
+ httpReq.Header.Set("X-Correlation-ID", worker.currentCtx.Value("cid").(string))
+ httpReq.Header.Set("X-State-ID", worker.activeId.String())
+ httpReq.Header.Set("X-Event-Type", msg.Headers().Get(string(glue.HeaderEventType)))
+ httpReq.Header.Set("X-Source-ID", msg.Headers().Get(string(glue.HeaderEmittedBy)))
+
+ req := &frankenphp.WorkerRequest{
+ Request: httpReq,
+ Response: nil, // Response writer will be provided by FrankenPHP
+ Done: make(chan struct{}),
+ }
+
+ return req
+}
+
+func processLocalMessage(localMsg *LocalMessage) *frankenphp.WorkerRequest {
+ // Create a synthetic HTTP request for the local message
+ httpReq, err := http.NewRequest("POST", "/worker", strings.NewReader(""))
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+
+ // Set headers to indicate this is a local request
+ httpReq.Header.Set("Content-Type", "application/json")
+ httpReq.Header.Set("DPHP_FUNCTION", localMsg.Method)
+ httpReq.Header.Set("STATE_ID", localMsg.StateId)
+
+ // Add any additional context as headers
+ for key, value := range localMsg.Context {
+ if str, ok := value.(string); ok {
+ httpReq.Header.Set(strings.ToUpper(key), str)
+ }
+ }
+
+ doneCh := make(chan struct{})
+ responseWriter := &localResponseWriter{
+ responseCh: localMsg.ResponseCh,
+ doneCh: doneCh,
+ buffer: make([]byte, 0),
+ }
+
+ req := &frankenphp.WorkerRequest{
+ Request: httpReq,
+ Response: responseWriter,
+ Done: doneCh,
+ }
+
+ return req
+}
+
+// localResponseWriter implements http.ResponseWriter and sends response to the channel
+type localResponseWriter struct {
+ responseCh chan interface{}
+ doneCh chan struct{}
+ buffer []byte
+ sent bool
+}
+
+func (w *localResponseWriter) Header() http.Header {
+ return make(http.Header)
+}
+
+func (w *localResponseWriter) Write(data []byte) (int, error) {
+ // Accumulate response data in buffer
+ w.buffer = append(w.buffer, data...)
+
+ // Start a goroutine to wait for completion if not already done
+ if !w.sent {
+ w.sent = true
+ go w.waitForCompletion()
+ }
+
+ return len(data), nil
+}
+
+func (w *localResponseWriter) WriteHeader(statusCode int) {
+ // For local requests, we don't need to handle status codes
+}
+
+func (w *localResponseWriter) waitForCompletion() {
+ // Wait for the request to complete
+ <-w.doneCh
+
+ // Send the complete response
+ w.responseCh <- string(w.buffer)
+ close(w.responseCh)
+}
+
+func init() {
+ frankenphp.RegisterExtension(unsafe.Pointer(&C.ext_module_entry))
+
+ // initialize the workers
+ globalWorkerInstance = &frankenphpWorker{
+ requestChan: make(chan jetstream.Msg, 100), // Buffer for 100 messages
+ localMessageChan: make(chan *LocalMessage, 10), // Buffer for 10 local messages
+ }
+ frankenphp.RegisterExternalWorker(globalWorkerInstance)
+
+ // Set the local message sender for the auth package
+ auth.SendLocalMessage = SendLocalMessage
+}
+
+// SendLocalMessage sends a local synchronous request and waits for response
+func SendLocalMessage(method, stateId string, context map[string]interface{}) (interface{}, error) {
+ responseCh := make(chan interface{}, 1)
+
+ localMsg := &LocalMessage{
+ Method: method,
+ StateId: stateId,
+ Context: context,
+ ResponseCh: responseCh,
+ }
+
+ // Send the local message
+ select {
+ case globalWorkerInstance.localMessageChan <- localMsg:
+ // Wait for response
+ response := <-responseCh
+ return response, nil
+ default:
+ return nil, errors.New("local message channel is full")
+ }
+}
+
+// StartNATSMessageFeeder starts goroutines that feed NATS messages to the frankenphpWorker channel
+func StartNATSMessageFeeder(ctx context.Context, cfg *config.Config, logger *zap.Logger) {
+ js := helpers.Js
+
+ consumers := []ids.IdKind{ids.Activity, ids.Entity, ids.Orchestration}
+
+ for _, kind := range consumers {
+ go func(kind ids.IdKind) {
+ for {
+ stream, err := js.Stream(ctx, cfg.Stream)
+ if err != nil {
+ logger.Error("Failed to get stream", zap.Error(err))
+ time.Sleep(time.Second)
+ continue
+ }
+
+ consumer, err := stream.Consumer(ctx, cfg.Stream+"-"+string(kind))
+ if err != nil {
+ logger.Error("Failed to get consumer", zap.String("kind", string(kind)), zap.Error(err))
+ time.Sleep(time.Second)
+ continue
+ }
+
+ iter, err := consumer.Messages(jetstream.PullMaxMessages(1), jetstream.WithMessagesErrOnMissingHeartbeat(false))
+ if err != nil {
+ logger.Error("Failed to create message iterator", zap.String("kind", string(kind)), zap.Error(err))
+ time.Sleep(time.Second)
+ continue
+ }
+
+ for {
+ msg, err := iter.Next()
+ if err != nil {
+ logger.Debug("Error getting next message", zap.String("kind", string(kind)), zap.Error(err))
+ break // Break inner loop to recreate consumer
+ }
+
+ // Send message to frankenphpWorker channel
+ select {
+ case globalWorkerInstance.requestChan <- msg:
+ // Message sent successfully
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }(kind)
+ }
+}
+
+//export go_init_module
+func go_init_module() {
+ cfg, err := config.GetProjectConfig()
+ if err != nil {
+ panic(err)
+ }
+ helpers.Config = cfg
+
+ helpers.Logger = helpers.GetLogger(zap.DebugLevel)
+ logger := helpers.Logger
+
+ logger.Info("Starting Durable PHP")
+
+ helpers.Ctx = context.WithValue(context.Background(), "bootstrap", cfg.Bootstrap)
+
+ boostrapNats := cfg.Nat.Bootstrap
+ if cfg.Nat.Internal {
+ logger.Warn("Running in dev mode, all data will be deleted at the end of this")
+ helpers.NatsState, err = os.MkdirTemp("", "nats-state-*")
+ if err != nil {
+ panic(err)
+ }
+
+ helpers.NatServer = test.RunServer(&server.Options{
+ Host: "localhost",
+ Port: 4222,
+ NoLog: true,
+ NoSigs: true,
+ JetStream: true,
+ MaxControlLine: 2048,
+ StoreDir: helpers.NatsState,
+ HTTPPort: 8222,
+ })
+ boostrapNats = true
+ }
+
+ nopts := []nats.Option{
+ nats.Compression(true),
+ nats.RetryOnFailedConnect(true),
+ }
+
+ if cfg.Nat.Jwt != "" && cfg.Nat.Nkey != "" {
+ nopts = append(nopts, nats.UserCredentials(cfg.Nat.Jwt, cfg.Nat.Nkey))
+ }
+
+ if cfg.Nat.Tls.Ca != "" {
+ nopts = append(nopts, nats.RootCAs(strings.Split(cfg.Nat.Tls.Ca, ",")...))
+ }
+
+ if cfg.Nat.Tls.KeyFile != "" {
+ nopts = append(nopts, nats.ClientCert(cfg.Nat.Tls.ClientCert, cfg.Nat.Tls.KeyFile))
+ }
+
+ ns, err := nats.Connect(cfg.Nat.Url, nopts...)
+ if err != nil {
+ panic(err)
+ }
+ helpers.Js, err = jetstream.New(ns)
+ if err != nil {
+ panic(err)
+ }
+ ctx := context.WithValue(context.Background(), "bootstrap", cfg.Bootstrap)
+
+ if boostrapNats {
+ stream, _ := helpers.Js.CreateStream(ctx, jetstream.StreamConfig{
+ Name: cfg.Stream,
+ Description: "Handles durable-php events",
+ Subjects: []string{cfg.Stream + ".>"},
+ Retention: jetstream.WorkQueuePolicy,
+ Storage: jetstream.FileStorage,
+ AllowRollup: false,
+ DenyDelete: true,
+ DenyPurge: true,
+ })
+ _, _ = helpers.Js.CreateStream(ctx, jetstream.StreamConfig{
+ Name: cfg.Stream + "_history",
+ Description: "The history of the stream",
+ Mirror: &jetstream.StreamSource{
+ Name: cfg.Stream,
+ },
+ Retention: jetstream.LimitsPolicy,
+ AllowRollup: true,
+ MaxAge: 7 * 24 * time.Hour,
+ Discard: jetstream.DiscardOld,
+ })
+
+ consumers := []string{
+ string(ids.Activity),
+ string(ids.Entity),
+ string(ids.Orchestration),
+ }
+
+ for _, kind := range consumers {
+ _, _ = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
+ Durable: cfg.Stream + "-" + kind,
+ FilterSubject: cfg.Stream + "." + kind + ".>",
+ AckPolicy: jetstream.AckExplicitPolicy,
+ AckWait: 5 * time.Minute,
+ })
+ }
+ }
+
+ if len(cfg.Extensions.Search.Collections) > 0 {
+ for _, collection := range cfg.Extensions.Search.Collections {
+ switch collection {
+ case "entities":
+ err := lib.IndexerListen(ctx, cfg, ids.Entity, helpers.Js, logger)
+ if err != nil {
+ cfg.Extensions.Search.Collections = []string{}
+ logger.Warn("Disabling search extension due to failing to connect to typesense")
+ }
+ case "orchestrations":
+ err := lib.IndexerListen(ctx, cfg, ids.Orchestration, helpers.Js, logger)
+ if err != nil {
+ cfg.Extensions.Search.Collections = []string{}
+ logger.Warn("Disabling search extension due to failing to connect to typesense")
+ }
+ }
+ }
+ }
+
+ if cfg.Extensions.Billing.Enabled {
+ if cfg.Extensions.Billing.Listen {
+
+ billings := sync.Map{}
+ billings.Store("e", 0)
+ billings.Store("o", 0)
+ billings.Store("a", 0*time.Minute)
+ billings.Store("ac", 0)
+
+ var incrementInt func(key string, amount int)
+ incrementInt = func(key string, amount int) {
+ var old interface{}
+ old, _ = billings.Load(key)
+ if !billings.CompareAndSwap(key, old, old.(int)+1) {
+ incrementInt(key, amount)
+ }
+ }
+
+ var incrementDur func(key string, amount time.Duration)
+ incrementDur = func(key string, amount time.Duration) {
+ var old interface{}
+ old, _ = billings.Load(key)
+ if !billings.CompareAndSwap(key, old, old.(time.Duration)+amount) {
+ incrementDur(key, amount)
+ }
+ }
+
+ /*
+ outputBillingStatus := func() {
+ costC := func(num interface{}, basis int) float64 {
+ return float64(num.(int)) * float64(basis) / 10_000_000
+ }
+
+ costA := func(dur interface{}, basis int) float64 {
+ duration := dur.(time.Duration)
+ seconds := duration.Seconds()
+ return float64(basis) * seconds / 100_000
+ }
+
+ avg := func(dur interface{}, count interface{}) time.Duration {
+ seconds := dur.(time.Duration).Seconds()
+ return time.Duration(seconds/float64(count.(int))) * time.Second
+ }
+
+ e, _ := billings.Load("e")
+ o, _ := billings.Load("o")
+ ac, _ := billings.Load("ac")
+ a, _ := billings.Load("a")
+
+ ecost := costC(e, cfg.Extensions.Billing.Costs.Entities.Cost)
+ ocost := costC(o, cfg.Extensions.Billing.Costs.Orchestrations.Cost)
+ acost := costA(a, cfg.Extensions.Billing.Costs.Activities.Cost)
+
+ logger.Warn("Billing estimate",
+ zap.Any("launched entities", e),
+ zap.String("entity cost", fmt.Sprintf("$%.2f", ecost)),
+ zap.Any("launched orchestrations", o),
+ zap.String("orchestration cost", fmt.Sprintf("$%.2f", ocost)),
+ zap.Any("activity time", a),
+ zap.Any("activities launced", ac),
+ zap.Any("average activity time", avg(a, ac)),
+ zap.String("activity cost", fmt.Sprintf("$%.2f", acost)),
+ zap.String("total estimate", fmt.Sprintf("$%.2f", ecost+ocost+acost)),
+ )
+ }
+
+ go func() {
+ ticker := time.NewTicker(3 * time.Second)
+ for range ticker.C {
+ outputBillingStatus()
+ }
+ }()
+ */
+
+ billingStream, err := helpers.Js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
+ Name: "billing",
+ Subjects: []string{
+ "billing." + cfg.Stream + ".>",
+ },
+ Storage: jetstream.FileStorage,
+ Retention: jetstream.LimitsPolicy,
+ MaxAge: 7 * 24 * time.Hour,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ entityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
+ Durable: "entityAggregator",
+ FilterSubjects: []string{
+ "billing." + cfg.Stream + ".entities.>",
+ },
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ _, err = entityConsumer.Consume(func(msg jetstream.Msg) {
+ incrementInt("e", 1)
+ msg.Ack()
+ })
+ if err != nil {
+ panic(err)
+ }
+ //defer consume.Drain()
+
+ orchestrationConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
+ Durable: "orchestrationAggregator",
+ FilterSubject: "billing." + cfg.Stream + ".orchestrations.>",
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ _, err = orchestrationConsumer.Consume(func(msg jetstream.Msg) {
+ incrementInt("o", 1)
+ msg.Ack()
+ })
+ if err != nil {
+ panic(err)
+ }
+ //defer consume.Drain()
+
+ activityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
+ Durable: "activityAggregator",
+ FilterSubject: "billing." + cfg.Stream + ".activities.>",
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ _, err = activityConsumer.Consume(func(msg jetstream.Msg) {
+ incrementInt("ac", 1)
+ var ev lib.BillingEvent
+ err := json.Unmarshal(msg.Data(), &ev)
+ if err != nil {
+ panic(err)
+ }
+ incrementDur("a", ev.Duration)
+ msg.Ack()
+ })
+ if err != nil {
+ panic(err)
+ }
+ //defer consume.Drain()
+ }
+
+ err := lib.StartBillingProcessor(ctx, cfg, helpers.Js, logger)
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ // Start NATS message feeder for frankenphpWorker
+ StartNATSMessageFeeder(ctx, cfg, logger)
+}
+
+//export go_shutdown_module
+func go_shutdown_module() {
+ if helpers.NatServer != nil {
+ helpers.NatServer.Shutdown()
+ }
+ // remove nats state directory
+ os.RemoveAll(helpers.NatsState)
+}
+
+//export emit_event
+func emit_event(userContext *C.zval, event *C.zval, fromStr *C.zend_string) int64 {
+
+ userVal := frankenphp.GoArray(unsafe.Pointer(userContext))
+
+ var user *auth.User
+ ctx, cancel := context.WithCancel(helpers.Ctx)
+ defer cancel()
+
+ if userVal != nil {
+ user = helpers.GetUserContext(userVal)
+ if user.UserId == "" || len(user.Roles) == 0 {
+ helpers.LogError("User context is missing userId or roles")
+ return 0
+ }
+ ctx = auth.DecorateContextWithUser(ctx, user)
+ }
+
+ from := ids.ParseStateId(frankenphp.GoString(unsafe.Pointer(fromStr)))
+
+ eventArr := frankenphp.GoArray(unsafe.Pointer(event))
+ ev, err := helpers.ParseEvent(eventArr)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return 0
+ }
+
+ replyTo := ""
+ if ev.ReplyTo != "" {
+ replyTo = ids.ParseStateId(ev.ReplyTo).ToSubject().String()
+ }
+
+ splitType := strings.Split(ev.EventType, "\\")
+ eventType := splitType[len(splitType)-1]
+
+ destinationId := ids.ParseStateId(ev.Destination)
+
+ now, err := time.Now().MarshalText()
+ if err != nil {
+ helpers.LogError(err.Error())
+ return 0
+ }
+
+ userJson, err := json.Marshal(user)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return 0
+ }
+
+ header := make(nats.Header)
+ header.Add(string(glue.HeaderStateId), destinationId.String())
+ header.Add(string(glue.HeaderEventType), eventType)
+ header.Add(string(glue.HeaderTargetType), ev.TargetType)
+ header.Add(string(glue.HeaderEmittedAt), string(now))
+ header.Add(string(glue.HeaderProvenance), string(userJson))
+ header.Add(string(glue.HeaderTargetOps), ev.TargetOps)
+ header.Add(string(glue.HeaderSourceOps), ev.SourceOps)
+ header.Add(string(glue.HeaderMeta), ev.Meta)
+ header.Add(string(glue.HeaderEmittedBy), from.String())
+
+ msg := &nats.Msg{
+ Subject: helpers.Config.Stream + "." + destinationId.ToSubject().String(),
+ Reply: replyTo,
+ Header: header,
+ Data: []byte(ev.Event),
+ }
+
+ if ev.ScheduleAt.After(time.Now()) {
+ msg.Header.Add(string(glue.HeaderDelay), ev.ScheduleAt.Format(time.RFC3339))
+ }
+
+ ack, err := helpers.Js.PublishMsg(ctx, msg)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return 0
+ }
+ return int64(ack.Sequence)
+
+}
+
+type Worker struct {
+ kind ids.IdKind
+ started bool
+ consumer *helpers.Consumer
+ activeId *ids.StateId
+ state *glue.StateArray
+ pendingEvents []*frankenphp.Array
+ authContext []byte
+ currentCtx context.Context
+ currentMsg jetstream.Msg
+}
+
+//export registerGoObject
+func registerGoObject(obj interface{}) C.uintptr_t {
+ handle := cgo.NewHandle(obj)
+ return C.uintptr_t(handle)
+}
+
+//export getGoObject
+func getGoObject(handle C.uintptr_t) interface{} {
+ h := cgo.Handle(handle)
+ return h.Value()
+}
+
+//export removeGoObject
+func removeGoObject(handle C.uintptr_t) {
+ h := cgo.Handle(handle)
+ h.Delete()
+}
+
+//export create_Worker_object
+func create_Worker_object() C.uintptr_t {
+ obj := &Worker{
+ kind: "api",
+ started: false,
+ consumer: nil,
+ activeId: nil,
+ state: nil,
+ pendingEvents: nil,
+ authContext: nil,
+ currentCtx: context.Background(),
+ currentMsg: nil,
+ }
+ return registerGoObject(obj)
+}
+
+func (w *Worker) __destruct() {
+ w.consumer.Msg.Stop()
+ w.consumer.Done()
+}
+
+// getNextEvent removed - logic moved to ProvideRequest method
+
+func (w *Worker) queryState(idStr *C.zend_string) unsafe.Pointer {
+ id := ids.ParseStateId(frankenphp.GoString(unsafe.Pointer(idStr)))
+ state, err := glue.GetStateArray(id, helpers.Js, w.currentCtx, helpers.Logger)
+ if err != nil {
+ helpers.LogError(err.Error())
+ return nil
+ }
+ return frankenphp.PHPArray(state.Data.Array)
+}
+
+func (w *Worker) getUser() unsafe.Pointer {
+ if provenance, ok := w.currentCtx.Value(appcontext.CurrentUserKey).(*auth.User); ok {
+ ret := &glue.Array{}
+ ret.SetString("user", string(provenance.UserId))
+ roles := &frankenphp.Array{}
+ for _, r := range provenance.Roles {
+ roles.Append(string(r))
+ }
+ ret.SetString("roles", roles)
+
+ return frankenphp.PHPArray(ret.Array)
+ }
+
+ return nil
+}
+
+func (w *Worker) setUser(userArr *C.zval) {
+ if userArr == nil {
+ w.currentCtx = context.WithValue(w.currentCtx, appcontext.CurrentUserKey, nil)
+ }
+
+ arrVal := frankenphp.GoArray(unsafe.Pointer(userArr))
+ user := helpers.GetUserContext(arrVal)
+ w.currentCtx = context.WithValue(w.currentCtx, appcontext.CurrentUserKey, user)
+}
+
+func (w *Worker) getSource() unsafe.Pointer {
+ sourceId := ids.ParseStateId(w.currentMsg.Headers().Get(string(glue.HeaderEmittedBy)))
+ return frankenphp.PHPString(sourceId.String(), false)
+}
+
+func (w *Worker) getCurrentId() unsafe.Pointer {
+ return frankenphp.PHPString(w.activeId.String(), false)
+}
+
+func (w *Worker) getCorrelationId() unsafe.Pointer {
+ return frankenphp.PHPString(w.currentCtx.Value("cid").(string), false)
+}
+
+func (w *Worker) getState() unsafe.Pointer {
+ return frankenphp.PHPArray(w.state.Data.Array)
+}
+
+func (w *Worker) updateState(state *C.zval) {
+ arr := frankenphp.GoArray(unsafe.Pointer(state))
+ w.state.Data.Array = arr
+}
+
+func (w *Worker) emitEvent(event *C.zval) {
+ arr := frankenphp.GoArray(unsafe.Pointer(event))
+ w.pendingEvents = append(w.pendingEvents, arr)
+}
+
+func (w *Worker) delete() {}
+
+//export __destruct_wrapper
+func __destruct_wrapper(handle C.uintptr_t) {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return
+ }
+ structObj := obj.(*Worker)
+ structObj.__destruct()
+}
+
+// getNextEvent_wrapper removed - no longer needed
+
+//export queryState_wrapper
+func queryState_wrapper(handle C.uintptr_t, stateId *C.zend_string) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.queryState(stateId)
+}
+
+//export getUser_wrapper
+func getUser_wrapper(handle C.uintptr_t) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.getUser()
+}
+
+//export getSource_wrapper
+func getSource_wrapper(handle C.uintptr_t) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.getSource()
+}
+
+//export getCurrentId_wrapper
+func getCurrentId_wrapper(handle C.uintptr_t) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.getCurrentId()
+}
+
+//export getCorrelationId_wrapper
+func getCorrelationId_wrapper(handle C.uintptr_t) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.getCorrelationId()
+}
+
+//export getState_wrapper
+func getState_wrapper(handle C.uintptr_t) unsafe.Pointer {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return nil
+ }
+ structObj := obj.(*Worker)
+ return structObj.getState()
+}
+
+//export updateState_wrapper
+func updateState_wrapper(handle C.uintptr_t, state *C.zval) {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return
+ }
+ structObj := obj.(*Worker)
+ structObj.updateState(state)
+}
+
+//export emitEvent_wrapper
+func emitEvent_wrapper(handle C.uintptr_t, eventDescription *C.zval) {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return
+ }
+ structObj := obj.(*Worker)
+ structObj.emitEvent(eventDescription)
+}
+
+//export delete_wrapper
+func delete_wrapper(handle C.uintptr_t) {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return
+ }
+ structObj := obj.(*Worker)
+ structObj.delete()
+}
+
+//export setUser_wrapper
+func setUser_wrapper(handle C.uintptr_t, user *C.zval) {
+ obj := getGoObject(handle)
+ if obj == nil {
+ return
+ }
+
+ structObj := obj.(*Worker)
+ structObj.setUser(user)
+}
+
+// SetCurrentWorker sets the current frankenphpWorker context for the PHP extension
+func SetCurrentWorker(worker *Worker) {
+ if worker == nil {
+ C.clear_current_worker()
+ return
+ }
+
+ handle := registerGoObject(worker)
+ C.set_current_worker_handle(C.uintptr_t(handle))
+}
diff --git a/cli/ext/build/ext.h b/cli/ext/build/ext.h
new file mode 100644
index 00000000..1564fd32
--- /dev/null
+++ b/cli/ext/build/ext.h
@@ -0,0 +1,13 @@
+#ifndef _EXT_H
+#define _EXT_H
+
+#include
+#include
+
+extern zend_module_entry ext_module_entry;
+
+/* Functions for managing current worker */
+void set_current_worker_handle(uintptr_t handle);
+void clear_current_worker(void);
+
+#endif
diff --git a/cli/ext/build/ext.stub.php b/cli/ext/build/ext.stub.php
new file mode 100644
index 00000000..67bcceea
--- /dev/null
+++ b/cli/ext/build/ext.stub.php
@@ -0,0 +1,38 @@
+ /home/withinboredom/code/frankenphp
+
require github.com/nats-io/nats.go v1.44.0
-require github.com/nats-io/nats-server/v2 v2.11.6
+require github.com/nats-io/nats-server/v2 v2.11.7
require github.com/teris-io/cli v1.0.1
@@ -39,9 +41,9 @@ require (
github.com/nats-io/jwt/v2 v2.7.4 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
- github.com/oapi-codegen/runtime v1.1.1 // indirect
+ github.com/oapi-codegen/runtime v1.1.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
- github.com/prometheus/client_golang v1.22.0 // indirect
+ github.com/prometheus/client_golang v1.23.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
@@ -50,6 +52,6 @@ require (
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/time v0.12.0 // indirect
- google.golang.org/protobuf v1.36.6 // indirect
+ google.golang.org/protobuf v1.36.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/cli/go.sum b/cli/go.sum
index f1c57d20..904921ce 100644
--- a/cli/go.sum
+++ b/cli/go.sum
@@ -13,8 +13,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
-github.com/dunglas/frankenphp v1.9.0 h1:tucI7uSZEmwGRGg7JxAf3wTwLrYs319mSc6fATG9z5I=
-github.com/dunglas/frankenphp v1.9.0/go.mod h1:jpmWK5Nmi2LkpgL+Td0+LQWRcQ5jVOYsuT9f+L7ohDs=
github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo=
github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI=
@@ -48,20 +46,20 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
-github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
-github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
+github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms=
+github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s=
github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M=
github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
-github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
-github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
+github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI=
+github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
-github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
+github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc=
+github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE=
@@ -104,6 +102,8 @@ golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
+google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
+google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
diff --git a/cli/lib/api.go b/cli/lib/api.go
index 90456c10..2a6ce2c1 100644
--- a/cli/lib/api.go
+++ b/cli/lib/api.go
@@ -2,12 +2,12 @@ package lib
import (
"context"
- "durable_php/auth"
- "durable_php/config"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
"fmt"
+ "github.com/bottledcode/durable-php/cli/auth"
+ "github.com/bottledcode/durable-php/cli/config"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
"github.com/dunglas/frankenphp"
"github.com/google/uuid"
"github.com/gorilla/mux"
@@ -36,7 +36,7 @@ func generateCorrelationId() string {
return string(bytes)
}
-func getCorrelationId(ctx context.Context, hHeaders *http.Header, nHeaders *nats.Header) context.Context {
+func GetCorrelationId(ctx context.Context, hHeaders *http.Header, nHeaders *nats.Header) context.Context {
if ctx.Value("cid") != nil {
return ctx
}
@@ -97,7 +97,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
}
request.Header.Add("DPHP_BOOTSTRAP", config.Bootstrap)
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
request, err := frankenphp.NewRequestWithContext(request, frankenphp.WithRequestEnv(map[string]string{
@@ -134,7 +134,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
store, err := glue.GetObjectStore("activities", js, context.Background())
@@ -161,7 +161,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -198,7 +198,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
http.Error(writer, "Page should be integer", http.StatusBadRequest)
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
if len(config.Extensions.Search.Collections) == 0 {
@@ -338,7 +338,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -392,7 +392,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -470,7 +470,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -531,7 +531,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
Id: strings.TrimSpace(vars["id"]),
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
if request.Method == "GET" {
@@ -611,7 +611,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
http.Error(writer, "Method Not Allowed", http.StatusMethodNotAllowed)
@@ -630,7 +630,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -666,7 +666,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -720,7 +720,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -798,7 +798,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -854,7 +854,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
vars := mux.Vars(request)
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
id := &ids.OrchestrationId{
@@ -984,7 +984,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
return
}
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
vars := mux.Vars(request)
@@ -1007,7 +1007,7 @@ func Startup(ctx context.Context, js jetstream.JetStream, logger *zap.Logger, po
r.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
logger.Warn("Unknown endpoint")
- ctx := getCorrelationId(ctx, &request.Header, nil)
+ ctx := GetCorrelationId(ctx, &request.Header, nil)
logRequest(logger, request, ctx)
})
diff --git a/cli/lib/billing.go b/cli/lib/billing.go
index c19a3422..a1a23187 100644
--- a/cli/lib/billing.go
+++ b/cli/lib/billing.go
@@ -2,11 +2,11 @@ package lib
import (
"context"
- "durable_php/config"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
"fmt"
+ "github.com/bottledcode/durable-php/cli/config"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
"time"
diff --git a/cli/lib/consumer.go b/cli/lib/consumer.go
index c68dfb81..099cbb1d 100644
--- a/cli/lib/consumer.go
+++ b/cli/lib/consumer.go
@@ -2,21 +2,38 @@ package lib
import (
"context"
- "durable_php/appcontext"
- "durable_php/auth"
- "durable_php/config"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
"fmt"
- "github.com/nats-io/nats.go/jetstream"
- "go.uber.org/zap"
"net/http"
"runtime"
"strings"
"time"
+
+ "github.com/bottledcode/durable-php/cli/appcontext"
+ "github.com/bottledcode/durable-php/cli/auth"
+ "github.com/bottledcode/durable-php/cli/config"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
+ "github.com/nats-io/nats.go/jetstream"
+ "go.uber.org/zap"
)
+func StartConsumer(ctx context.Context, config *config.Config, stream jetstream.Stream, logger *zap.Logger, kind ids.IdKind) jetstream.MessagesContext {
+ logger.Debug("Starting consumer", zap.String("stream", config.Stream), zap.String("kind", string(kind)))
+
+ consumer, err := stream.Consumer(ctx, config.Stream+"-"+string(kind))
+ if err != nil {
+ panic(err)
+ }
+
+ iter, err := consumer.Messages(jetstream.PullMaxMessages(10), jetstream.WithMessagesErrOnMissingHeartbeat(false))
+ if err != nil {
+ panic(err)
+ }
+
+ return iter
+}
+
func BuildConsumer(stream jetstream.Stream, ctx context.Context, config *config.Config, kind ids.IdKind, logger *zap.Logger, js jetstream.JetStream, rm *auth.ResourceManager) {
logger.Debug("Creating consumer", zap.String("stream", config.Stream), zap.String("kind", string(kind)))
@@ -64,7 +81,7 @@ func BuildConsumer(stream jetstream.Stream, ctx context.Context, config *config.
return
}
- ctx := getCorrelationId(ctx, nil, &headers)
+ ctx := GetCorrelationId(ctx, nil, &headers)
// spawn a thread to process the message, but rate limit
go func() {
@@ -93,20 +110,267 @@ func BuildConsumer(stream jetstream.Stream, ctx context.Context, config *config.
}()
}
+func getStateId(msg jetstream.Msg) *ids.StateId {
+ return ids.ParseStateId(msg.Headers().Get(string(glue.HeaderStateId)))
+}
+
+func lockStateId(ctx context.Context, id *ids.StateId, js jetstream.JetStream, logger *zap.Logger) (func() error, error) {
+ if id.Kind != ids.Entity {
+ return func() error { return nil }, nil
+ }
+
+ unlocker, err := lockSubject(ctx, id.ToSubject(), js, logger)
+ if err != nil {
+ return func() error { return nil }, err
+ }
+ return unlocker, nil
+}
+
+func getUserFromHeader(msg jetstream.Msg) (*auth.User, error) {
+ r := &auth.User{}
+ b := msg.Headers().Get(string(glue.HeaderProvenance))
+ err := json.Unmarshal([]byte(b), r)
+ if err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+// ProcessMessage takes a message and some references and returns:
+// 1. an auth context
+// 2. the destination id
+// 3. the state
+// 4. or an error
+func ProcessMessage(
+ ctx context.Context,
+ logger *zap.Logger,
+ msg jetstream.Msg,
+ rm *auth.ResourceManager,
+ config *config.Config,
+ js jetstream.JetStream,
+) ([]byte, *ids.StateId, *glue.StateArray, error) {
+ logger.Debug("Processing message", zap.Any("msg", msg))
+
+ id := getStateId(msg)
+ unlocker, err := lockStateId(ctx, id, js, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ defer unlocker()
+
+ ctx, cancelCtx := context.WithCancel(ctx)
+ defer cancelCtx()
+
+ currentUser, err := getUserFromHeader(msg)
+ if err != nil {
+ logger.Warn("Failed to unmarshal event provenance",
+ zap.Any("Provenance", msg.Headers().Get(string(glue.HeaderProvenance))),
+ zap.Error(err),
+ )
+ currentUser = nil
+ } else {
+ ctx = auth.DecorateContextWithUser(ctx, currentUser)
+ }
+
+ // retrieve the source
+ sourceId := ids.ParseStateId(msg.Headers().Get(string(glue.HeaderEmittedBy)))
+ var authContext []byte
+
+ if config.Extensions.Authz.Enabled {
+ // extract the source operations
+ sourceOps := strings.Split(msg.Headers().Get(string(glue.HeaderSourceOps)), ",")
+
+ // extract the target operations
+ targetOps := strings.Split(msg.Headers().Get(string(glue.HeaderTargetOps)), ",")
+ preventCreation := true
+ for _, op := range targetOps {
+ switch auth.Operation(op) {
+ case auth.Signal:
+ fallthrough
+ case auth.Call:
+ fallthrough
+ case auth.Lock:
+ fallthrough
+ case auth.Output:
+ preventCreation = false
+ }
+ }
+
+ resource, err := rm.DiscoverResource(ctx, id, sourceId, logger, preventCreation)
+ if err != nil {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "create"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, err
+ }
+ if resource == nil {
+ logger.Warn("User accessed missing object", zap.Any("operation", sourceOps), zap.String("from", sourceId.Id), zap.String("to", id.Id), zap.String("user", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+
+ authContext, err = rm.ToAuthContext(ctx, resource)
+ if err != nil {
+ logger.Warn("Failed to retrieve auth context", zap.Error(err))
+ msg.Ack()
+ return []byte{}, nil, nil, err
+ }
+
+ m := msg.Headers().Get(string(glue.HeaderMeta))
+ var meta map[string]interface{}
+ if m != "[]" {
+ err = json.Unmarshal([]byte(m), &meta)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+
+ switch msg.Headers().Get(string(glue.HeaderEventType)) {
+ case "RevokeRole":
+ if !resource.WantTo(auth.ShareMinus, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "revokeRole"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ role := meta["role"].(string)
+
+ err := resource.RevokeRole(auth.Role(role), ctx)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ case "RevokeUser":
+ if !resource.WantTo(auth.ShareMinus, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "revokeUser"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ user := meta["userId"].(string)
+ err := resource.RevokeUser(auth.UserId(user), ctx)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ case "ShareWithRole":
+ if !resource.WantTo(auth.SharePlus, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "shareWithRole"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ role := meta["role"].(auth.Role)
+ operations := meta["allowedOperations"].([]auth.Operation)
+
+ for _, op := range operations {
+ err := resource.GrantRole(role, op, ctx)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ case "ShareWithUser":
+ if !resource.WantTo(auth.SharePlus, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "shareWithUser"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ role := meta["userId"].(auth.UserId)
+ operations := meta["allowedOperations"].([]auth.Operation)
+
+ for _, op := range operations {
+ err := resource.GrantUser(role, op, ctx)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ case "ShareOwnership":
+ if !resource.WantTo(auth.Owner, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "shareOwnership"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ userId := meta["userId"].(auth.UserId)
+ user := ctx.Value(appcontext.CurrentUserKey).(*auth.User)
+ err := resource.ShareOwnership(userId, user, true)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ case "GiveOwnership":
+ if !resource.WantTo(auth.Owner, ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", "giveOwnership"), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ userId := meta["userId"].(auth.UserId)
+ user := ctx.Value(appcontext.CurrentUserKey).(*auth.User)
+ err := resource.ShareOwnership(userId, user, true)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ err = resource.Update(ctx, logger)
+ if err != nil {
+ return []byte{}, nil, nil, err
+ }
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ }
+
+ for _, op := range targetOps {
+ if !resource.WantTo(auth.Operation(op), ctx) {
+ logger.Warn("User attempted to perform an unauthorized operation", zap.String("operation", op), zap.String("From", sourceId.Id), zap.String("To", id.Id), zap.String("User", string(currentUser.UserId)))
+ msg.Ack()
+ return []byte{}, nil, nil, nil
+ }
+ }
+ }
+
+ state, err := glue.GetStateArray(id, js, ctx, logger)
+ if err != nil {
+ logger.Warn("Failed to retrieve state", zap.Error(err))
+ msg.Ack()
+ return []byte{}, nil, nil, err
+ }
+
+ return authContext, id, state, nil
+}
+
// processMsg is responsible for processing a message received from JetStream.
// It takes a logger, msg, and JetStream as parameters. Do not panic!
func processMsg(ctx context.Context, logger *zap.Logger, msg jetstream.Msg, js jetstream.JetStream, config *config.Config, rm *auth.ResourceManager) error {
logger.Debug("Received message", zap.Any("msg", msg))
// lock the Subject, if it is a lockable Subject
- id := ids.ParseStateId(msg.Headers().Get(string(glue.HeaderStateId)))
- if id.Kind == ids.Entity {
- unlocker, err := lockSubject(ctx, id.ToSubject(), js, logger)
- if err != nil {
- return err
- }
- defer unlocker()
+ id := getStateId(msg)
+ unlocker, err := lockStateId(ctx, id, nil, logger)
+ if err != nil {
+ return err
}
+ defer unlocker()
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()
@@ -114,7 +378,7 @@ func processMsg(ctx context.Context, logger *zap.Logger, msg jetstream.Msg, js j
// configure the current user
currentUser := &auth.User{}
b := msg.Headers().Get(string(glue.HeaderProvenance))
- err := json.Unmarshal([]byte(b), currentUser)
+ err = json.Unmarshal([]byte(b), currentUser)
if err != nil {
logger.Warn("Failed to unmarshal event provenance",
zap.Any("Provenance", msg.Headers().Get(string(glue.HeaderProvenance))),
diff --git a/cli/lib/index.go b/cli/lib/index.go
index 0f5e11ad..83755fd5 100644
--- a/cli/lib/index.go
+++ b/cli/lib/index.go
@@ -2,7 +2,7 @@ package lib
import (
"context"
- "durable_php/config"
+ "github.com/bottledcode/durable-php/cli/config"
"github.com/typesense/typesense-go/typesense"
"github.com/typesense/typesense-go/typesense/api"
"github.com/typesense/typesense-go/typesense/api/pointer"
diff --git a/cli/lib/indexer.go b/cli/lib/indexer.go
index 7cfe0b3f..2027178d 100644
--- a/cli/lib/indexer.go
+++ b/cli/lib/indexer.go
@@ -2,10 +2,10 @@ package lib
import (
"context"
- "durable_php/config"
- "durable_php/glue"
- "durable_php/ids"
"encoding/json"
+ "github.com/bottledcode/durable-php/cli/config"
+ "github.com/bottledcode/durable-php/cli/glue"
+ "github.com/bottledcode/durable-php/cli/ids"
"github.com/nats-io/nats.go/jetstream"
"github.com/typesense/typesense-go/typesense"
"github.com/typesense/typesense-go/typesense/api"
diff --git a/cli/lib/locks.go b/cli/lib/locks.go
index 2b72b41f..225ed98e 100644
--- a/cli/lib/locks.go
+++ b/cli/lib/locks.go
@@ -2,8 +2,8 @@ package lib
import (
"context"
- "durable_php/ids"
"errors"
+ "github.com/bottledcode/durable-php/cli/ids"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
"time"
diff --git a/cli/test.http b/cli/test.http
new file mode 100644
index 00000000..bd6e575c
--- /dev/null
+++ b/cli/test.http
@@ -0,0 +1 @@
+GET localhost:8080/test.php
\ No newline at end of file
diff --git a/cli/test.php b/cli/test.php
new file mode 100644
index 00000000..95dd5a95
--- /dev/null
+++ b/cli/test.php
@@ -0,0 +1,26 @@
+toArray(), StateId::fromEntityId(EntityId('test', 'test')));
+
+// echo emit_event(['userId' => 'bob', 'roles' => ['admin']], [], 'activity:bullshit');
diff --git a/composer.json b/composer.json
index 3cf9fb97..54c8fb99 100644
--- a/composer.json
+++ b/composer.json
@@ -40,7 +40,8 @@
"ramsey/uuid": "^4.9.0",
"webonyx/graphql-php": "^15.22.0",
"withinboredom/records": "^0.1.3",
- "withinboredom/time": "^6.0.0"
+ "withinboredom/time": "^6.0.0",
+ "ext-frankenphp": "*"
},
"require-dev": {
"laravel/pint": "^1.24.0",
diff --git a/frankenphp-ext/ext/client.go b/frankenphp-ext/ext/client.go
new file mode 100644
index 00000000..32e86273
--- /dev/null
+++ b/frankenphp-ext/ext/client.go
@@ -0,0 +1,34 @@
+package ext
+
+import (
+ "C"
+ "github.com/dunglas/frankenphp"
+ "strings"
+ "unsafe"
+)
+
+func init() {
+ frankenphp.RegisterExtension(unsafe.Pointer(&C.ext_module_entry))
+}
+
+// export_php:module init
+func moduleInit() {
+ // this is during init
+}
+
+// export_php:function repeat_this(string $id, string $str, int $count, bool $reverse): string
+func repeat_this(idStr *C.zend_string, s *C.zend_string, count int, reverse bool) unsafe.Pointer {
+ str := frankenphp.GoString(unsafe.Pointer(s))
+
+ result := strings.Repeat(str, count)
+
+ if reverse {
+ runes := []rune(result)
+ for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
+ runes[i], runes[j] = runes[j], runes[i]
+ }
+ result = string(runes)
+ }
+
+ return frankenphp.PHPString(result, false)
+}
diff --git a/src/DurableClient.php b/src/DurableClient.php
index df594a24..82299911 100644
--- a/src/DurableClient.php
+++ b/src/DurableClient.php
@@ -26,6 +26,9 @@
use Amp\Http\Client\HttpClientBuilder;
use Bottledcode\DurablePhp\Events\Shares\Operation;
+use Bottledcode\DurablePhp\Ext\Worker;
+use Bottledcode\DurablePhp\Glue\Provenance;
+use Bottledcode\DurablePhp\Proxy\SpyProxy;
use Bottledcode\DurablePhp\Search\EntityFilter;
use Bottledcode\DurablePhp\State\EntityId;
use Bottledcode\DurablePhp\State\EntityState;
@@ -43,7 +46,25 @@ public function __construct(
private OrchestrationClientInterface $orchestrationClient,
) {}
- public static function get(string $apiHost = 'http://localhost:8080'): self
+ public static function local(?Provenance $userContext = null): self
+ {
+ $worker = new Worker();
+ $entityClient = new LocalEntityClient(new SpyProxy(), $worker);
+ $orchestrationClient = new LocalOrchestrationClient(new SpyProxy(), $worker);
+ $entityClient->withAuth($userContext);
+ $orchestrationClient->withAuth($userContext);
+
+ return new self($entityClient, $orchestrationClient);
+ }
+
+ #[Override]
+ public function withAuth(Provenance|string|null $token): void
+ {
+ $this->orchestrationClient->withAuth($token);
+ $this->entityClient->withAuth($token);
+ }
+
+ public static function remote(string $apiHost = 'http://localhost:8080'): self
{
$builder = new HttpClientBuilder();
$builder->retry(3);
@@ -132,13 +153,6 @@ public function signal(EntityId|string $entityId, Closure $signal): void
$this->entityClient->signal($entityId, $signal);
}
- #[Override]
- public function withAuth(string $token): void
- {
- $this->orchestrationClient->withAuth($token);
- $this->entityClient->withAuth($token);
- }
-
public function deleteEntity(EntityId $entityId): void
{
$this->entityClient->deleteEntity($entityId);
diff --git a/src/EntityClientInterface.php b/src/EntityClientInterface.php
index e2e6e5b2..70fb6605 100644
--- a/src/EntityClientInterface.php
+++ b/src/EntityClientInterface.php
@@ -25,6 +25,7 @@
namespace Bottledcode\DurablePhp;
use Bottledcode\DurablePhp\Events\Shares\Operation;
+use Bottledcode\DurablePhp\Glue\Provenance;
use Bottledcode\DurablePhp\Search\EntityFilter;
use Bottledcode\DurablePhp\State\EntityId;
use Bottledcode\DurablePhp\State\EntityState;
@@ -34,7 +35,7 @@
interface EntityClientInterface
{
- public function withAuth(string $token): void;
+ public function withAuth(Provenance|string|null $token): void;
/**
* Removes empty entities and releases orphaned locks
diff --git a/src/Events/EventDescription.php b/src/Events/EventDescription.php
index 3685439e..8a13144f 100644
--- a/src/Events/EventDescription.php
+++ b/src/Events/EventDescription.php
@@ -29,18 +29,23 @@
use Bottledcode\DurablePhp\Events\Shares\Operation;
use Bottledcode\DurablePhp\State\Ids\StateId;
use Bottledcode\DurablePhp\State\Serializer;
+use Crell\Serde\Attributes\Field;
use DateTimeImmutable;
use JsonException;
use ReflectionClass;
readonly class EventDescription
{
+ #[Field(default: null, omitIfNull: true)]
public ?StateId $replyTo;
+ #[Field(serializedName: 'scheduleAt', default: null, omitIfNull: true)]
public ?DateTimeImmutable $scheduledAt;
+ #[Field(default: null, omitIfNull: true)]
public ?StateId $destination;
+ #[Field(default: null, omitIfNull: true)]
public ?StateId $from;
public string $eventId;
@@ -180,6 +185,11 @@ public static function fromJson(string $json): EventDescription
}
public function toStream(): string
+ {
+ return json_encode($this->toArray(), JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES);
+ }
+
+ public function toArray(): array
{
$serialized = Serializer::serialize($this->event);
@@ -189,7 +199,7 @@ function_exists('igbinary_serialize') ? igbinary_serialize($serialized) : serial
$event = base64_encode($serialized);
- return json_encode([
+ return [
'destination' => $this->destination?->id ?? null,
'replyTo' => $this->replyTo?->id ?? '',
'scheduleAt' => $this->scheduledAt?->format(DATE_ATOM) ?? gmdate(DATE_ATOM, time() - 30),
@@ -200,7 +210,7 @@ function_exists('igbinary_serialize') ? igbinary_serialize($serialized) : serial
'targetOps' => implode(',', array_map(static fn($x) => $x->value, $this->targetOperations)),
'meta' => json_encode($this->meta ?? [], JSON_THROW_ON_ERROR),
'event' => $event,
- ], JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES);
+ ];
}
/**
diff --git a/src/Glue/glue.php b/src/Glue/Glue.php
similarity index 75%
rename from src/Glue/glue.php
rename to src/Glue/Glue.php
index 99bf8c71..bff2937e 100644
--- a/src/Glue/glue.php
+++ b/src/Glue/Glue.php
@@ -30,6 +30,7 @@
use Bottledcode\DurablePhp\Events\StartExecution;
use Bottledcode\DurablePhp\Events\WithEntity;
use Bottledcode\DurablePhp\Events\WithOrchestration;
+use Bottledcode\DurablePhp\Ext\Worker;
use Bottledcode\DurablePhp\SerializedArray;
use Bottledcode\DurablePhp\State\ActivityHistory;
use Bottledcode\DurablePhp\State\Attributes\AllowCreateAll;
@@ -54,7 +55,6 @@
use DI\Definition\InstanceDefinition;
use DI\Definition\ObjectDefinition;
use JsonException;
-use LogicException;
use Ramsey\Uuid\Uuid;
use ReflectionClass;
use ReflectionFunction;
@@ -62,8 +62,6 @@
use function Bottledcode\DurablePhp\OrchestrationInstance;
-require_once __DIR__ . '/autoload.php';
-
class Glue
{
public readonly ?string $bootstrap;
@@ -72,58 +70,52 @@ class Glue
public StateId $source;
- public $payloadHandle;
-
public array $payload = [];
public ?Provenance $provenance;
private string $method;
- private $streamHandle;
-
- private array $queries = [];
+ private ?Worker $worker = null;
public function __construct(private DurableLogger $logger)
{
- $this->target = StateId::fromString($_SERVER['STATE_ID']);
- $this->bootstrap = $_SERVER['HTTP_DPHP_BOOTSTRAP'] ?: null;
- $this->method = $_SERVER['HTTP_DPHP_FUNCTION'];
- try {
- $provenance = json_decode($_SERVER['HTTP_DPHP_PROVENANCE'] ?? 'null', true, 32, JSON_THROW_ON_ERROR);
- if (! $provenance || $provenance === ['userId' => '', 'roles' => null]) {
- $this->provenance = null;
- } else {
- $provenance['roles'] ??= [];
- $this->provenance = Serializer::deserialize($provenance, Provenance::class);
- }
- } catch (JsonException $e) {
- $this->logger->alert(
- 'Failed to capture provenance',
- ['provenance' => $_SERVER['HTTP_DPHP_PROVENANCE'] ?? null],
- );
- $this->provenance = null;
+ // Try to get the current worker from the extension
+ if (class_exists(Worker::class)) {
+ $this->worker = Worker::GetCurrent();
}
- $this->source = StateId::fromString($_SERVER['HTTP_DPHP_SOURCE']);
- if (! file_exists($_SERVER['HTTP_DPHP_PAYLOAD'])) {
- throw new LogicException('Unable to load payload');
- }
+ // If we have a worker, we can get context from it directly
+ if ($this->worker) {
+ $this->target = StateId::fromString($this->worker->getCurrentId());
+ $this->source = StateId::fromString($this->worker->getSource());
- $payload = stream_get_contents($this->payloadHandle = fopen($_SERVER['HTTP_DPHP_PAYLOAD'], 'r+b'));
- try {
- $this->payload = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
- } catch (JsonException) {
- $this->payload = [];
- }
- $this->logger->debug('Got payload', ['raw' => $payload, 'parsed' => $this->payload]);
+ // Get bootstrap from HTTP headers (set by Go runtime)
+ $this->bootstrap = $_SERVER['HTTP_DPHP_BOOTSTRAP'] ?? null;
+ $this->method = $_SERVER['HTTP_DPHP_FUNCTION'] ?? 'processMsg';
- $this->streamHandle = fopen('php://input', 'r+b');
- }
+ // Get user context from worker
+ $user = $this->worker->getUser();
+ if ($user) {
+ $this->provenance = new Provenance($user['user'] ?? '', $user['roles'] ?? []);
+ } else {
+ $this->provenance = null;
+ }
- public function __destruct()
- {
- fclose($this->payloadHandle);
+ // Get event payload from HTTP request body (sent by ProvideRequest)
+ $eventData = file_get_contents('php://input');
+ if ($eventData) {
+ try {
+ $this->payload = json_decode($eventData, true, 512, JSON_THROW_ON_ERROR);
+ } catch (JsonException) {
+ $this->payload = [];
+ }
+ } else {
+ $this->payload = [];
+ }
+
+ $this->logger->debug('Got payload from worker', ['parsed' => $this->payload]);
+ }
}
public function process(): void
@@ -133,29 +125,12 @@ public function process(): void
public function queryState(StateId $id): ?StateInterface
{
- $this->queries[] = true;
- echo implode('~!~', ['QUERY', $id->id, $qid = count($this->queries)]);
-
- while (true) {
- $result = fgets($this->streamHandle);
- if (str_starts_with($result, "{$qid}://")) {
- $file = explode('//', $result)[1];
-
- return $this->readStateFile($file);
- }
- }
- }
-
- private function readStateFile($resource): ?StateInterface
- {
- $payload = stream_get_contents($resource);
- try {
- $payload = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
- } catch (JsonException) {
+ $state = $this->worker->queryState($id->id);
+ if (empty($state)) {
return null;
}
- return Serializer::deserialize($payload, StateInterface::class);
+ return Serializer::deserialize($state, StateInterface::class);
}
public function processMsg(): void
@@ -176,7 +151,7 @@ public function bootstrap(): Container
public function outputDelete(): void
{
- echo 'DELETE~!~';
+ $this->worker->delete();
}
private function entitySignal(): void
@@ -189,9 +164,7 @@ private function entitySignal(): void
public function outputEvent(EventDescription $event): void
{
- // determine access level
-
- echo 'EVENT~!~' . mb_trim($event->toStream()) . "\n";
+ $this->worker->emitEvent($event->toArray());
}
private function startOrchestration(): void
@@ -213,18 +186,6 @@ private function startOrchestration(): void
$actualId = $this->target->toOrchestrationInstance();
- $this->writePayload(json_encode([
- 'id' => $this->target->id,
- 'execution' => $actualId->executionId,
- 'instance' => $actualId->instanceId,
- ], JSON_THROW_ON_ERROR));
- }
-
- private function writePayload(string $payload): void
- {
- fseek($this->payloadHandle, 0);
- ftruncate($this->payloadHandle, 0);
- fwrite($this->payloadHandle, $payload);
}
private function orchestrationSignal(): void
@@ -233,27 +194,18 @@ private function orchestrationSignal(): void
$signal = $_SERVER['HTTP_SIGNAL'];
$event = WithOrchestration::forInstance($this->target, RaiseEvent::forCustom($signal, $this->payload));
$this->outputEvent(new EventDescription($event));
- $this->writePayload('');
}
private function entityDecoder(): void
{
- $state = file_get_contents($_SERVER['HTTP_ENTITY_STATE']);
+ $state = $this->worker->getState();
if (empty($state)) {
- fwrite($this->payloadHandle, 'null');
-
return;
}
- $state = json_decode($state, true, 512, JSON_THROW_ON_ERROR);
$state = Serializer::deserialize($state, EntityHistory::class);
if ($state->getState() !== null) {
$state = Serializer::serialize($state->getState(), ['API']);
- } else {
- $this->writePayload('null');
-
- return;
}
- $this->writePayload(json_encode($state, JSON_THROW_ON_ERROR));
}
private function getPermissions(): void
@@ -361,9 +313,3 @@ private function getFromDefinition(Definition $definition): ReflectionClass|Refl
return null;
}
}
-
-header('Content-type: application/dphp');
-
-(new Glue($logger))->process();
-
-exit();
diff --git a/src/Glue/autoload.php b/src/Glue/autoload.php
index f25e2b49..cf5560d1 100644
--- a/src/Glue/autoload.php
+++ b/src/Glue/autoload.php
@@ -38,6 +38,7 @@
}
echo "ERROR: FAILED TO LOCATE AUTOLOADER\n";
+
return;
verify_protocol:
@@ -47,9 +48,3 @@
'INFO' => Level::Info,
default => Level::Error,
});
-
-if (($_SERVER['SERVER_PROTOCOL'] ?? null) !== 'DPHP/1.0') {
- http_response_code(400);
- $logger->critical('Invalid request protocol', [$_SERVER['SERVER_PROTOCOL'] ?? null]);
- die();
-}
diff --git a/src/Glue/worker.php b/src/Glue/worker.php
new file mode 100644
index 00000000..d0117119
--- /dev/null
+++ b/src/Glue/worker.php
@@ -0,0 +1,36 @@
+info('Starting worker');
+
+frankenphp_handle_request(static function (): void {
+ global $logger;
+
+ $glue = new Glue($logger);
+ $glue->process();
+});
diff --git a/src/LocalEntityClient.php b/src/LocalEntityClient.php
new file mode 100644
index 00000000..57813592
--- /dev/null
+++ b/src/LocalEntityClient.php
@@ -0,0 +1,207 @@
+getParameters()[0]?->getType()?->getName();
+ if ($interfaceName === null || interface_exists($interfaceName) === false) {
+ throw new Exception("Interface {$interfaceName} does not exist");
+ }
+ $spy = $this->spyProxy->define($interfaceName);
+ $operationName = null;
+ $arguments = null;
+ try {
+ $class = new $spy($operationName, $arguments);
+ $signal($class);
+ } catch (SpyException) {
+ }
+
+ if ($operationName === null || $arguments === null) {
+ return;
+ }
+
+ $this->signalEntity(
+ is_string($entityId) ? EntityId($interfaceName, $entityId) : $entityId,
+ $operationName,
+ $arguments,
+ );
+ }
+
+ #[Override]
+ public function signalEntity(
+ EntityId $entityId,
+ string $operationName,
+ array $input = [],
+ ?DateTimeImmutable $scheduledTime = null,
+ ): void {
+ $event = WithEntity::forInstance(
+ StateId::fromEntityId($entityId),
+ RaiseEvent::forOperation($operationName, $input),
+ );
+
+ if ($scheduledTime) {
+ $event = WithDelay::forEvent($scheduledTime, $event);
+ }
+
+ $this->worker->emitEvent(new EventDescription($event)->toArray());
+ }
+
+ #[Override]
+ public function getEntitySnapshot(EntityId $entityId): ?EntityState
+ {
+ $state = $this->worker->queryState(StateId::fromEntityId($entityId));
+ if (empty($state)) {
+ return null;
+ }
+
+ return Serializer::deserialize($state, EntityState::class);
+ }
+
+ #[Override]
+ public function withAuth(Provenance|string|null $token): void
+ {
+ $this->worker->setUser($token instanceof Provenance ? Serializer::serialize($token) : null);
+ }
+
+ #[Override]
+ public function deleteEntity(EntityId $entityId): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($entityId),
+ RaiseEvent::forOperation('delete', []),
+ ),
+ )->toArray(),
+ );
+ }
+
+ public function shareEntityOwnership(EntityId $id, string $with): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($id),
+ ShareOwnership::withUser($with),
+ ),
+ )->toArray(),
+ );
+ }
+
+ public function grantEntityAccessToUser(EntityId $id, string $user, Operation $operation): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($id),
+ ShareWithUser::For($user, $operation),
+ ),
+ )->toArray(),
+ );
+ }
+
+ public function grantEntityAccessToRole(EntityId $id, string $role, Operation $operation): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($id),
+ ShareWithRole::For($role, $operation),
+ ),
+ )->toArray(),
+ );
+ }
+
+ public function revokeEntityAccessToUser(EntityId $id, string $user): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($id),
+ RevokeUser::completely($user),
+ ),
+ )->toArray(),
+ );
+ }
+
+ public function revokeEntityAccessToRole(EntityId $id, string $role): void
+ {
+ $this->worker->emitEvent(
+ new EventDescription(
+ WithEntity::forInstance(
+ StateId::fromEntityId($id),
+ RevokeRole::completely($role),
+ ),
+ )->toArray(),
+ );
+ }
+}
diff --git a/src/LocalOrchestrationClient.php b/src/LocalOrchestrationClient.php
new file mode 100644
index 00000000..9faba901
--- /dev/null
+++ b/src/LocalOrchestrationClient.php
@@ -0,0 +1,171 @@
+worker->queryState(StateId::fromInstance($instance));
+
+ return Serializer::deserialize($result, Status::class);
+ }
+
+ #[Override]
+ public function raiseEvent(OrchestrationInstance $instance, string $eventName, array $eventData): void
+ {
+ $event = WithOrchestration::forInstance(
+ StateId::fromInstance($instance),
+ RaiseEvent::forOperation($eventName, $eventData),
+ );
+
+ $eventDescription = new EventDescription($event);
+ $this->worker->emitEvent($eventDescription->toArray());
+ }
+
+ #[Override]
+ public function restart(OrchestrationInstance $instance): void
+ {
+ throw new Exception('not implemented');
+ }
+
+ #[Override]
+ public function resume(OrchestrationInstance $instance, string $reason): void
+ {
+ throw new Exception('not implemented');
+ }
+
+ #[Override]
+ public function startNew(string $name, array $args = [], ?string $id = null): OrchestrationInstance
+ {
+ $orchestrationId = \Bottledcode\DurablePhp\OrchestrationInstance($name, $id ?? Uuid::uuid4()->toString());
+ $stateId = StateId::fromInstance($orchestrationId);
+
+ $event = WithOrchestration::forInstance(
+ $stateId,
+ StartOrchestration::forInstance($orchestrationId),
+ );
+
+ $eventDescription = new EventDescription($event);
+ $userArray = $this->userContext ? Serializer::serialize($this->userContext) : null;
+
+ $sequence = emit_event($userArray, $eventDescription->toArray(), $stateId->id);
+
+ $event = WithOrchestration::forInstance($stateId, StartExecution::asParent($args, []));
+ $eventDescription = new EventDescription($event);
+ emit_event($userArray, $eventDescription->toArray(), $sequence);
+
+ return $orchestrationId;
+ }
+
+ #[Override]
+ public function suspend(OrchestrationInstance $instance, string $reason): void
+ {
+ throw new Exception('suspend not implemented in LocalOrchestrationClient');
+ }
+
+ #[Override]
+ public function terminate(OrchestrationInstance $instance, string $reason): void
+ {
+ throw new Exception('terminate not implemented in LocalOrchestrationClient');
+ }
+
+ #[Override]
+ public function waitForCompletion(OrchestrationInstance $instance): void
+ {
+ throw new Exception('waitForCompletion not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+
+ #[Override]
+ public function withAuth(Provenance|string|null $token): void
+ {
+ $this->worker->setUser($token instanceof Provenance ? Serializer::serialize($token) : null);
+ }
+
+ public function shareOrchestrationOwnership(OrchestrationInstance $id, string $with): void
+ {
+ throw new Exception('shareOrchestrationOwnership not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+
+ public function grantOrchestrationAccessToUser(OrchestrationInstance $id, string $user, Operation $operation): void
+ {
+ throw new Exception('grantOrchestrationAccessToUser not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+
+ public function grantOrchestrationAccessToRole(OrchestrationInstance $id, string $role, Operation $operation): void
+ {
+ throw new Exception('grantOrchestrationAccessToRole not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+
+ public function revokeOrchestrationAccessToUser(OrchestrationInstance $id, string $user): void
+ {
+ throw new Exception('revokeOrchestrationAccessToUser not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+
+ public function revokeOrchestrationAccessToRole(OrchestrationInstance $id, string $role): void
+ {
+ throw new Exception('revokeOrchestrationAccessToRole not supported in LocalOrchestrationClient - use RemoteOrchestrationClient for this operation');
+ }
+}
diff --git a/src/OrchestrationClientInterface.php b/src/OrchestrationClientInterface.php
index 69f490c6..02f9addb 100644
--- a/src/OrchestrationClientInterface.php
+++ b/src/OrchestrationClientInterface.php
@@ -25,13 +25,14 @@
namespace Bottledcode\DurablePhp;
use Bottledcode\DurablePhp\Events\Shares\Operation;
+use Bottledcode\DurablePhp\Glue\Provenance;
use Bottledcode\DurablePhp\State\OrchestrationInstance;
use Bottledcode\DurablePhp\State\Status;
use Generator;
interface OrchestrationClientInterface
{
- public function withAuth(string $token): void;
+ public function withAuth(Provenance|string|null $token): void;
public function getStatus(OrchestrationInstance $instance): Status;
diff --git a/src/RemoteEntityClient.php b/src/RemoteEntityClient.php
index 8b2dc8a0..1a0ee5c6 100644
--- a/src/RemoteEntityClient.php
+++ b/src/RemoteEntityClient.php
@@ -27,6 +27,7 @@
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request;
use Bottledcode\DurablePhp\Events\Shares\Operation;
+use Bottledcode\DurablePhp\Glue\Provenance;
use Bottledcode\DurablePhp\Proxy\SpyException;
use Bottledcode\DurablePhp\Proxy\SpyProxy;
use Bottledcode\DurablePhp\Search\EntityFilter;
@@ -146,7 +147,7 @@ public function getEntitySnapshot(EntityId $entityId): ?EntityState
}
#[Override]
- public function withAuth(string $token): void
+ public function withAuth(Provenance|string|null $token): void
{
$this->userToken = $token;
}
diff --git a/src/RemoteOrchestrationClient.php b/src/RemoteOrchestrationClient.php
index 39b2d592..af063c73 100644
--- a/src/RemoteOrchestrationClient.php
+++ b/src/RemoteOrchestrationClient.php
@@ -28,6 +28,7 @@
use Amp\Http\Client\Request;
use Amp\Http\Client\SocketException;
use Bottledcode\DurablePhp\Events\Shares\Operation;
+use Bottledcode\DurablePhp\Glue\Provenance;
use Bottledcode\DurablePhp\Proxy\SpyProxy;
use Bottledcode\DurablePhp\State\Ids\StateId;
use Bottledcode\DurablePhp\State\OrchestrationInstance;
@@ -195,7 +196,7 @@ public function waitForCompletion(OrchestrationInstance $instance): void
}
#[Override]
- public function withAuth(string $token): void
+ public function withAuth(Provenance|string|null $token): void
{
$this->userToken = $token;
}
diff --git a/src/State/Ids/StateId.php b/src/State/Ids/StateId.php
index 4cde9555..4f0d88d9 100644
--- a/src/State/Ids/StateId.php
+++ b/src/State/Ids/StateId.php
@@ -31,6 +31,7 @@
use Bottledcode\DurablePhp\State\OrchestrationInstance;
use Bottledcode\DurablePhp\State\StateInterface;
use Crell\Serde\Attributes\ClassNameTypeMap;
+use Crell\Serde\Attributes\Field;
use Exception;
use Ramsey\Uuid\Uuid;
use Ramsey\Uuid\UuidInterface;
@@ -43,7 +44,8 @@
#[ClassNameTypeMap('__type')]
readonly class StateId extends Record implements Stringable
{
- public protected(set) string $id;
+ #[Field(flatten: true)]
+ public string $id;
public static function fromState(StateInterface $state): self
{