diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index 09fe655d75..7c6b866e83 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -21,6 +21,7 @@ import ( "github.com/block/ftl/internal/exec" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/sql" + "github.com/block/ftl/internal/watch" ) const ( @@ -35,7 +36,7 @@ var errSQLError = errors.New("failed to add queries to schema") // Plugins must use a lock file to ensure that only one build is running at a time. // // Returns invalidateDependenciesError if the build failed due to a change in dependencies. -func build(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { +func build(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, fileTransaction watch.ModifyFilesTransaction, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build") ctx = log.ContextWithLogger(ctx, logger) @@ -43,8 +44,9 @@ func build(ctx context.Context, projectConfig projectconfig.Config, m Module, pl if err != nil { return nil, "", nil, errors.WithStack(errors.Join(errSQLError, err)) } + stubsRoot := stubsLanguageDir(projectConfig.Root(), bctx.Config.Language) - moduleSchema, tmpDeployDir, deployPaths, err = handleBuildResult(ctx, projectConfig, m, result.From(plugin.Build(ctx, projectConfig, stubsRoot, bctx, devMode)), devMode, devModeEndpoints, optional.Some(bctx.Schema)) + moduleSchema, tmpDeployDir, deployPaths, err = handleBuildResult(ctx, projectConfig, m, fileTransaction, result.From(plugin.Build(ctx, projectConfig, stubsRoot, bctx, devMode)), devMode, devModeEndpoints, optional.Some(bctx.Schema)) if err != nil { return nil, "", nil, errors.WithStack(err) } @@ -52,7 +54,7 @@ func build(ctx context.Context, projectConfig projectconfig.Config, m Module, pl } // handleBuildResult processes the result of a build -func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, m Module, eitherResult result.Result[languageplugin.BuildResult], devMode bool, devModeEndpoints chan dev.LocalEndpoint, schemaOpt optional.Option[*schema.Schema]) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { +func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, m Module, fileTransaction watch.ModifyFilesTransaction, eitherResult result.Result[languageplugin.BuildResult], devMode bool, devModeEndpoints chan dev.LocalEndpoint, schemaOpt optional.Option[*schema.Schema]) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { logger := log.FromContext(ctx) config := m.Config.Abs() @@ -61,6 +63,13 @@ func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, return nil, "", nil, errors.Wrap(err, "failed to build module") } + if len(result.ModifiedFiles) > 0 { + logger.Infof("Modified files: %v", result.ModifiedFiles) + } + if err := fileTransaction.ModifiedFiles(result.ModifiedFiles...); err != nil { + return nil, "", nil, errors.Wrap(err, "failed to apply modified files") + } + if result.InvalidateDependencies { return nil, "", nil, errors.WithStack(errInvalidateDependencies) } diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index 29e8c018f9..2ce16711c0 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -8,7 +8,6 @@ import ( "runtime" "sort" "strings" - "sync" "time" "connectrpc.com/connect" @@ -30,7 +29,6 @@ import ( "github.com/block/ftl/common/slices" "github.com/block/ftl/internal/buildengine/languageplugin" "github.com/block/ftl/internal/dev" - imaps "github.com/block/ftl/internal/maps" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/realm" @@ -45,8 +43,8 @@ var _ rpc.Service = (*Engine)(nil) type moduleMeta struct { module Module plugin *languageplugin.LanguagePlugin - events chan languageplugin.PluginEvent configDefaults moduleconfig.CustomDefaults + watcher *watch.Watcher } // copyMetaWithUpdatedDependencies finds the dependencies for a module and returns a @@ -104,9 +102,6 @@ type Engine struct { buildEnv []string startTime optional.Option[time.Time] - // events coming in from plugins - pluginEvents chan languageplugin.PluginEvent - // requests to rebuild modules due to dependencies changing or plugins dying rebuildEvents chan rebuildEvent @@ -181,12 +176,12 @@ func New( rawEngineUpdates := make(chan *buildenginepb.EngineEvent, 128) e := &Engine{ - adminClient: adminClient, - projectConfig: projectConfig, - moduleDirs: moduleDirs, - moduleMetas: xsync.NewMapOf[string, moduleMeta](), - watcher: watch.NewWatcher(optional.Some(projectConfig.WatchModulesLockPath()), "ftl.toml", "**/*.sql"), - pluginEvents: make(chan languageplugin.PluginEvent, 128), + adminClient: adminClient, + projectConfig: projectConfig, + moduleDirs: moduleDirs, + moduleMetas: xsync.NewMapOf[string, moduleMeta](), + watcher: watch.NewWatcher(optional.Some(projectConfig.WatchModulesLockPath())), + // pluginEvents: make(chan languageplugin.PluginEvent, 128), parallelism: runtime.NumCPU(), modulesToBuild: xsync.NewMapOf[string, bool](), rebuildEvents: make(chan rebuildEvent, 128), @@ -225,7 +220,6 @@ func New( updateTerminalWithEngineEvents(ctx, e.engineUpdates) - go e.watchForPluginEvents(ctx) e.updatesService = e.startUpdatesService(ctx) go e.watchForEventsToPublish(ctx, len(configs) > 0) @@ -430,35 +424,45 @@ func (e *Engine) Modules() []string { // Dev builds and deploys all local modules and watches for changes, redeploying as necessary. func (e *Engine) Dev(ctx context.Context, period time.Duration) error { - return errors.WithStack(e.watchForModuleChanges(ctx, period)) + return errors.WithStack(e.processEvents(ctx, period)) } -// watchForModuleChanges watches for changes and all build start and event state changes. -func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration) error { +func (e *Engine) processEvents(ctx context.Context, period time.Duration) error { logger := log.FromContext(ctx) - watchEvents := make(chan watch.WatchEvent, 128) - topic, err := e.watcher.Watch(ctx, period, e.moduleDirs) - if err != nil { - return errors.Wrap(err, "failed to start watcher") - } - topic.Subscribe(watchEvents) + moduleWatchCancellations := map[string]context.CancelCauseFunc{} + moduleChanges := make(chan watch.WatchEvent, 128) + e.moduleMetas.Range(func(name string, meta moduleMeta) bool { + if err := e.watchModuleForChanges(ctx, meta, period, moduleChanges, moduleWatchCancellations); err != nil { + logger.Errorf(err, "failed to watch module %s", meta.module.Config.Module) + } + return true + }) // Build and deploy all modules first. if err := e.BuildAndDeploy(ctx, optional.None[int32](), true, false); err != nil { logger.Errorf(err, "Initial build and deploy failed") } + moduleListChanges := make(chan watch.WatchEvent, 16) + moduleListTopic, err := e.watcher.Watch(ctx, period, e.moduleDirs) + if err != nil { + return errors.Wrap(err, "failed to start watcher") + } + moduleListTopic.Subscribe(moduleListChanges) + // Update schema and set initial module hashes +drainSchemaUpdates: for { select { case event := <-e.deployCoordinator.SchemaUpdates: e.targetSchema.Store(event.schema) - continue + default: + break drainSchemaUpdates } - break } + moduleHashes := map[string][]byte{} for _, sch := range e.targetSchema.Load().InternalModules() { hash, err := computeModuleHash(sch) @@ -473,7 +477,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration case <-ctx.Done(): return errors.WithStack(ctx.Err()) - case event, ok := <-watchEvents: + case event, ok := <-moduleListChanges: if !ok { // Watcher stopped unexpectedly (channel closed). logger.Debugf("Watch event channel closed, watcher likely stopped.") @@ -482,77 +486,41 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } return errors.New("watcher stopped unexpectedly") } + switch event := event.(type) { case watch.WatchEventModuleAdded: - logger.Debugf("Module %q added", event.Config.Module) - config := event.Config - if _, exists := e.moduleMetas.Load(config.Module); !exists { - meta, err := e.newModuleMeta(ctx, config) - logger.Debugf("generated meta for %q", event.Config.Module) - if err != nil { - logger.Errorf(err, "could not add module %s", config.Module) - continue - } - e.moduleMetas.Store(config.Module, meta) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleAdded{ - ModuleAdded: &buildenginepb.ModuleAdded{ - Module: config.Module, - }, - }, - } - logger.Debugf("calling build and deploy %q", event.Config.Module) - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, config.Module); err != nil { - logger.Errorf(err, "Build and deploy failed for added module %s", config.Module) - } - } - case watch.WatchEventModuleRemoved: - err := e.deployCoordinator.terminateModuleDeployment(ctx, event.Config.Module) - if err != nil { - logger.Errorf(err, "terminate %s failed", event.Config.Module) - } - if meta, ok := e.moduleMetas.Load(event.Config.Module); ok { - meta.plugin.Updates().Unsubscribe(meta.events) - err := meta.plugin.Kill() - if err != nil { - logger.Errorf(err, "terminate %s plugin failed", event.Config.Module) - } - } - e.moduleMetas.Delete(event.Config.Module) - e.modulesToBuild.Delete(event.Config.Module) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleRemoved{ - ModuleRemoved: &buildenginepb.ModuleRemoved{ - Module: event.Config.Module, - }, - }, - } - case watch.WatchEventModuleChanged: - // ftl.toml file has changed - meta, ok := e.moduleMetas.Load(event.Config.Module) - if !ok { - logger.Warnf("Module %q not found", event.Config.Module) + _, exists := e.moduleMetas.Load(event.Config.Module) + if exists { continue } - - updatedConfig, err := moduleconfig.LoadConfig(event.Config.Dir) + meta, err := e.handleNewModule(ctx, event.Config) if err != nil { - logger.Errorf(err, "Could not load updated toml for %s", event.Config.Module) + logger.Errorf(err, "could not add module %s", meta.module.Config.Module) continue } - validConfig, err := updatedConfig.FillDefaultsAndValidate(meta.configDefaults, e.projectConfig) - if err != nil { - logger.Errorf(err, "Could not configure module config defaults for %s", event.Config.Module) + if err := e.watchModuleForChanges(ctx, meta, period, moduleChanges, moduleWatchCancellations); err != nil { + logger.Errorf(err, "failed to watch module %s", meta.module.Config.Module) continue } - meta.module.Config = validConfig - e.moduleMetas.Store(event.Config.Module, meta) - - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, event.Config.Module); err != nil { - logger.Errorf(err, "Build and deploy failed for updated module %s", event.Config.Module) + e.triggerBuildAndDeploy(ctx, event.Config.Module) + case watch.WatchEventModuleRemoved: + if cancel, ok := moduleWatchCancellations[event.Config.Module]; ok { + cancel(errors.Wrap(context.Canceled, "module removed")) } + delete(moduleWatchCancellations, event.Config.Module) + e.handleModuleRemoval(ctx, event.Config) + case watch.WatchEventModuleChanged: + // Changes within a module are not handled here + } + + case event := <-moduleChanges: + switch event := event.(type) { + case watch.WatchEventModuleAdded, *watch.WatchEventModuleRemoved: + // Module detectiomn is not handle here + case watch.WatchEventModuleChanged: + // Changes within a module are not handled here + logger.Infof("Module %q changed: %s", event.Config.Module, event.String()) + e.triggerBuildAndDeploy(ctx, event.Config.Module) } case event := <-e.deployCoordinator.SchemaUpdates: e.targetSchema.Store(event.schema) @@ -578,6 +546,12 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration moduleHashes[module.Name] = hash + // // Sync references to stubs if needed by the runtime + // err = e.syncNewStubReferences(ctx, builtSchemas, metasMap) + // if err != nil { + // logger.Errorf(err, "Failed to sync stub references") + // } + dependentModuleNames := e.getDependentModuleNames(module.Name) dependentModuleNames = slices.Filter(dependentModuleNames, func(name string) bool { // We don't update if this was already part of the same changeset @@ -585,81 +559,83 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration }) if len(dependentModuleNames) > 0 { logger.Infof("%s's schema changed; processing %s", module.Name, strings.Join(dependentModuleNames, ", ")) //nolint:forbidigo - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, dependentModuleNames...); err != nil { - logger.Errorf(err, "Build and deploy failed for dependent modules of %s", module.Name) + for _, name := range dependentModuleNames { + e.triggerBuildAndDeploy(ctx, name) } } } + } + } +} - case event := <-e.rebuildEvents: - events := []rebuildEvent{event} - readLoop: - for { - select { - case event := <-e.rebuildEvents: - events = append(events, event) - default: - break readLoop - } - } - // Batch generate stubs for all auto rebuilds - // - // This is normally part of each group in the build topology, but auto rebuilds do not go through that flow - builtModuleEvents := map[string]autoRebuildCompletedEvent{} - for _, event := range events { - event, ok := event.(autoRebuildCompletedEvent) - if !ok { - continue - } - builtModuleEvents[event.module] = event - } - if len(builtModuleEvents) > 0 { - metasMap := map[string]moduleMeta{} - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - metasMap[name] = meta - return true - }) - builtSchemas := imaps.MapValues(builtModuleEvents, func(_ string, e autoRebuildCompletedEvent) *schema.Module { return e.schema }) - err = GenerateStubs(ctx, e.projectConfig.Root(), maps.Values(builtSchemas), metasMap) - if err != nil { - logger.Errorf(err, "Failed to generate stubs") - } - - // Sync references to stubs if needed by the runtime - err = e.syncNewStubReferences(ctx, builtSchemas, metasMap) - if err != nil { - logger.Errorf(err, "Failed to sync stub references") - } +func (e *Engine) handleNewModule(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) (moduleMeta, error) { + logger := log.FromContext(ctx) + logger.Debugf("Module %q added", config.Module) + meta, err := e.newModuleMeta(ctx, config) + logger.Debugf("generated meta for %q", config.Module) + if err != nil { + return moduleMeta{}, errors.WithStack(err) + } + e.moduleMetas.Store(config.Module, meta) + e.rawEngineUpdates <- &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleAdded{ + ModuleAdded: &buildenginepb.ModuleAdded{ + Module: config.Module, + }, + }, + } + return meta, nil +} - // Deploy modules - var modulesToDeploy = []*pendingModule{} - for _, event := range builtModuleEvents { - moduleToDeploy, ok := e.moduleMetas.Load(event.module) - if ok { - modulesToDeploy = append(modulesToDeploy, newPendingModule(moduleToDeploy.module, event.tmpDeployDir, event.deployPaths, event.schema)) - } - } - go func() { - _ = e.deployCoordinator.deploy(ctx, modulesToDeploy, optional.None[int32]()) //nolint:errcheck - }() - } +func (e *Engine) watchModuleForChanges(ctx context.Context, meta moduleMeta, period time.Duration, subscriber chan watch.WatchEvent, moduleCancellations map[string]context.CancelCauseFunc) error { + name := meta.module.Config.Module + if existing, ok := moduleCancellations[name]; ok { + existing(errors.New("replacing existing watcher for module")) + delete(moduleCancellations, name) + } + config := meta.module.Config.Abs() + ctx, cancel := context.WithCancelCause(ctx) + updates, err := meta.watcher.Watch(ctx, period, []string{config.Dir}) + if err != nil { + cancel(context.Canceled) + return errors.Wrapf(err, "failed to watch module directory %s", config.Dir) + } + updates.Subscribe(subscriber) + moduleCancellations[name] = cancel + return nil +} - // Batch together all new builds requested - modulesToBuild := map[string]bool{} - for _, event := range events { - event, ok := event.(rebuildRequestEvent) - if !ok { - continue - } - modulesToBuild[event.module] = true - } - if len(modulesToBuild) > 0 { - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, maps.Keys(modulesToBuild)...); err != nil { - logger.Errorf(err, "Build and deploy failed for rebuild requested modules") - } - } +func (e *Engine) handleModuleRemoval(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) { + logger := log.FromContext(ctx) + err := e.deployCoordinator.terminateModuleDeployment(ctx, config.Module) + if err != nil { + logger.Errorf(err, "terminate %s failed", config.Module) + } + if meta, ok := e.moduleMetas.Load(config.Module); ok { + err := meta.plugin.Kill() + if err != nil { + logger.Errorf(err, "terminate %s plugin failed", config.Module) } } + e.moduleMetas.Delete(config.Module) + e.modulesToBuild.Delete(config.Module) + e.rawEngineUpdates <- &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleRemoved{ + ModuleRemoved: &buildenginepb.ModuleRemoved{ + Module: config.Module, + }, + }, + } +} + +func (e *Engine) triggerBuildAndDeploy(ctx context.Context, moduleName string) { + logger := log.FromContext(ctx) + logger.Debugf("calling build and deploy %q", moduleName) + if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, moduleName); err != nil { + logger.Errorf(err, "Build and deploy failed for module %s", moduleName) + } } type moduleState int @@ -1147,34 +1123,6 @@ func (e *Engine) handleDependencyCycleError(ctx context.Context, depErr Dependen return nil } remainingModulesErr := e.buildWithCallback(ctx, callback, remaining...) - - wg := &sync.WaitGroup{} - for _, module := range depErr.Modules { - // Make sure each module in dependency cycle has an active build stream so changes to dependencies are detected - wg.Add(1) - go func() { - defer wg.Done() - - ignoredSchemas := make(chan *schema.Module, 1) - fakeDeps := map[string]*schema.Module{ - "builtin": schema.Builtins(), - } - for _, dep := range graph[module] { - if sch, ok := e.GetModuleSchema(dep); ok { - fakeDeps[dep] = sch - continue - } - // not build yet, probably due to dependency cycle - fakeDeps[dep] = &schema.Module{ - Name: dep, - Comments: []string{"Dependency not built yet due to dependency cycle"}, - } - } - _, _, _, _ = e.build(ctx, module, fakeDeps, ignoredSchemas) //nolint:errcheck - close(ignoredSchemas) - }() - } - wg.Wait() return errors.WithStack(remainingModulesErr) } @@ -1246,7 +1194,8 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ sch := &schema.Schema{Realms: []*schema.Realm{{Modules: maps.Values(builtModules)}}} //nolint:exptostd - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) + config := meta.module.Config.Abs() + configProto, err := langpb.ModuleConfigToProto(config) if err != nil { return nil, "", nil, errors.Wrap(err, "failed to marshal module config") } @@ -1254,7 +1203,11 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ meta.module = meta.module.CopyWithSQLErrors(nil) e.moduleMetas.Store(moduleName, meta) } - moduleSchema, tmpDeployDir, deployPaths, err := build(ctx, e.projectConfig, meta.module, meta.plugin, languageplugin.BuildContext{ + transaction := meta.watcher.GetTransaction(config.Dir) + if err := transaction.Begin(); err != nil { + return nil, "", nil, errors.Wrapf(err, "failed to begin file transaction for %s", config.Dir) + } + moduleSchema, tmpDeployDir, deployPaths, err := build(ctx, e.projectConfig, meta.module, meta.plugin, transaction, languageplugin.BuildContext{ Config: meta.module.Config, Schema: sch, Dependencies: meta.module.Dependencies(Raw), @@ -1262,6 +1215,9 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ Os: e.os, Arch: e.arch, }, e.devMode, e.devModeEndpointUpdates) + if err := transaction.End(); err != nil { + return nil, "", nil, errors.Wrapf(err, "failed to end file transaction for %s", config.Dir) + } if err != nil { if errors.Is(err, errSQLError) { @@ -1368,25 +1324,6 @@ func (e *Engine) newModuleMeta(ctx context.Context, config moduleconfig.Unvalida if err != nil { return moduleMeta{}, errors.Wrapf(err, "could not create plugin for %s", config.Module) } - events := make(chan languageplugin.PluginEvent, 64) - plugin.Updates().Subscribe(events) - - // pass on plugin events to the main event channel - // make sure we do not pass on nil (chan closure) events - go func() { - for { - select { - case event := <-events: - if event == nil { - // chan closed - return - } - e.pluginEvents <- event - case <-ctx.Done(): - return - } - } - }() // update config with defaults customDefaults, err := languageplugin.GetModuleConfigDefaults(ctx, config.Language, config.Dir) @@ -1397,123 +1334,15 @@ func (e *Engine) newModuleMeta(ctx context.Context, config moduleconfig.Unvalida if err != nil { return moduleMeta{}, errors.Wrapf(err, "could not apply defaults for %s", config.Module) } + + patterns := validConfig.Watch + patterns = append(patterns, "ftl.toml", "**/*.sql") + watcher := watch.NewWatcher(optional.None[string](), patterns...) + return moduleMeta{ module: newModule(validConfig), plugin: plugin, - events: events, configDefaults: customDefaults, + watcher: watcher, }, nil } - -// watchForPluginEvents listens for build updates from language plugins and reports them to the listener. -// These happen when a plugin for a module detects a change and automatically rebuilds. -func (e *Engine) watchForPluginEvents(originalCtx context.Context) { - for { - select { - case event := <-e.pluginEvents: - switch event := event.(type) { - case languageplugin.PluginBuildEvent, languageplugin.AutoRebuildStartedEvent, languageplugin.AutoRebuildEndedEvent: - buildEvent := event.(languageplugin.PluginBuildEvent) //nolint:forcetypeassert - logger := log.FromContext(originalCtx).Module(buildEvent.ModuleName()).Scope("build") - ctx := log.ContextWithLogger(originalCtx, logger) - meta, ok := e.moduleMetas.Load(buildEvent.ModuleName()) - if !ok { - logger.Warnf("module not found for build update") - continue - } - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) - if err != nil { - continue - } - switch event := buildEvent.(type) { - case languageplugin.AutoRebuildStartedEvent: - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ - Config: configProto, - IsAutoRebuild: true, - }, - }, - } - - case languageplugin.AutoRebuildEndedEvent: - moduleSch, tmpDeployDir, deployPaths, err := handleBuildResult(ctx, e.projectConfig, meta.module, event.Result, e.devMode, e.devModeEndpointUpdates, optional.None[*schema.Schema]()) - if err != nil { - if errors.Is(err, errInvalidateDependencies) { - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ - ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ - Config: configProto, - }, - }, - } - // Do not block this goroutine by building a module here. - // Instead we send to a chan so that it can be processed elsewhere. - e.rebuildEvents <- rebuildRequestEvent{module: event.ModuleName()} - // We don't update the state to failed, as it is going to be rebuilt - continue - } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ - ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ - Config: configProto, - IsAutoRebuild: true, - Errors: &langpb.ErrorList{ - Errors: errorToLangError(err), - }, - }, - }, - } - continue - } - - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ - ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{ - Config: configProto, - IsAutoRebuild: true, - }, - }, - } - e.rebuildEvents <- autoRebuildCompletedEvent{module: event.ModuleName(), schema: moduleSch, tmpDeployDir: tmpDeployDir, deployPaths: deployPaths} - } - case languageplugin.PluginDiedEvent: - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - if meta.plugin != event.Plugin { - return true - } - logger := log.FromContext(originalCtx).Module(name) - logger.Errorf(event.Error, "Plugin died, recreating") - - c, err := moduleconfig.LoadConfig(meta.module.Config.Dir) - if err != nil { - logger.Errorf(err, "Could not recreate plugin: could not load config") - return false - } - newMeta, err := e.newModuleMeta(originalCtx, c) - if err != nil { - logger.Errorf(err, "Could not recreate plugin") - return false - } - e.moduleMetas.Store(name, newMeta) - e.rebuildEvents <- rebuildRequestEvent{module: name} - return false - }) - } - case <-originalCtx.Done(): - // kill all plugins - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - err := meta.plugin.Kill() - if err != nil { - log.FromContext(originalCtx).Errorf(err, "could not kill plugin") - } - return true - }) - return - } - } -} diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 424c5e919e..6b07aa9ec8 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -8,20 +8,14 @@ import ( "time" "connectrpc.com/connect" - "github.com/alecthomas/atomic" errors "github.com/alecthomas/errors" "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/pubsub" - "github.com/alecthomas/types/result" langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" "github.com/block/ftl/common/builderrors" - "github.com/block/ftl/common/log" "github.com/block/ftl/common/schema" - "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" - "github.com/block/ftl/internal/watch" ) const BuildLockTimeout = time.Minute @@ -44,50 +38,12 @@ type BuildResult struct { // File that the runner can use to pass info into the hot reload endpoint HotReloadEndpoint optional.Option[string] HotReloadVersion optional.Option[int64] - modifiedFiles []string + ModifiedFiles []string DebugPort int redeployNotRequired bool } -// PluginEvent is used to notify of updates from the plugin. -// -//sumtype:decl -type PluginEvent interface { - pluginEvent() -} - -type PluginBuildEvent interface { - PluginEvent - ModuleName() string -} - -// AutoRebuildStartedEvent is sent when the plugin starts an automatic rebuild. -type AutoRebuildStartedEvent struct { - Module string -} - -func (AutoRebuildStartedEvent) pluginEvent() {} -func (e AutoRebuildStartedEvent) ModuleName() string { return e.Module } - -// AutoRebuildEndedEvent is sent when the plugin ends an automatic rebuild. -type AutoRebuildEndedEvent struct { - Module string - Result result.Result[BuildResult] -} - -func (AutoRebuildEndedEvent) pluginEvent() {} -func (e AutoRebuildEndedEvent) ModuleName() string { return e.Module } - -// PluginDiedEvent is sent when the plugin dies. -type PluginDiedEvent struct { - // Plugins do not always have an associated module name, so we include the module - Plugin *LanguagePlugin - Error error -} - -func (PluginDiedEvent) pluginEvent() {} - // BuildContext contains contextual information needed to build. // // Any change to the build context would require a new build. @@ -114,8 +70,6 @@ func New(ctx context.Context, dir, language, name string) (p *LanguagePlugin, er func newPluginForTesting(ctx context.Context, client pluginClient) *LanguagePlugin { plugin := &LanguagePlugin{ client: client, - updates: pubsub.New[PluginEvent](), - bctx: atomic.New[*buildInfo](nil), buildRunning: &sync.Mutex{}, } go plugin.watchForCmdError(ctx) @@ -126,10 +80,6 @@ func newPluginForTesting(ctx context.Context, client pluginClient) *LanguagePlug type LanguagePlugin struct { client pluginClient - // cancels the run() context - updates *pubsub.Topic[PluginEvent] - watch *pubsub.Topic[watch.WatchEvent] - bctx *atomic.Value[*buildInfo] buildRunning *sync.Mutex } @@ -144,12 +94,6 @@ func (p *LanguagePlugin) Kill() error { return nil } -// Updates topic for all update events from the plugin -// The same topic must be returned each time this method is called -func (p *LanguagePlugin) Updates() *pubsub.Topic[PluginEvent] { - return p.updates -} - // GetDependencies returns the dependencies of the module. func (p *LanguagePlugin) GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) { configProto, err := langpb.ModuleConfigToProto(config.Abs()) @@ -218,11 +162,10 @@ func (p *LanguagePlugin) SyncStubReferences(ctx context.Context, config moduleco // Build builds the module with the latest config and schema. // In dev mode, plugin is responsible for automatically rebuilding as relevant files within the module change, // and publishing these automatic builds updates to Updates(). -func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig.Config, stubsRoot string, bctx BuildContext, rebuildAutomatically bool) (BuildResult, error) { +func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig.Config, stubsRoot string, bctx BuildContext, devModeBuild bool) (BuildResult, error) { p.buildRunning.Lock() defer p.buildRunning.Unlock() startTime := time.Now() - p.bctx.Store(&buildInfo{projectConfig: projectConfig, stubsRoot: stubsRoot, bctx: bctx}) configProto, err := langpb.ModuleConfigToProto(bctx.Config.Abs()) if err != nil { @@ -233,7 +176,7 @@ func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig. result, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ ProjectConfig: langpb.ProjectConfigToProto(projectConfig), StubsRoot: stubsRoot, - DevModeBuild: rebuildAutomatically, + DevModeBuild: devModeBuild, BuildContext: &langpb.BuildContext{ ModuleConfig: configProto, Schema: schemaProto, @@ -245,58 +188,9 @@ func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig. })) if err != nil { - return BuildResult{}, errors.Wrap(err, "failed to invoke build command") + return BuildResult{}, errors.WithStack(err) } - if rebuildAutomatically && p.watch == nil { - watcher := watch.NewWatcher(optional.None[string](), bctx.Config.Watch...) - updates, err := watcher.Watch(ctx, time.Second, []string{bctx.Config.Dir}) - if err != nil { - log.FromContext(ctx).Errorf(err, "Failed to watch module directory") - return buildResultFromProto(result.Msg, startTime) - } - p.watch = updates - go p.runWatch(ctx, watcher) - } - return buildResultFromProto(result.Msg, startTime) - -} - -func (p *LanguagePlugin) runWatch(ctx context.Context, watcher *watch.Watcher) { - logger := log.FromContext(ctx) - defer func() { - p.watch = nil - }() - updates := make(chan watch.WatchEvent) - p.watch.Subscribe(updates) - for i := range channels.IterContext(ctx, updates) { - if _, ok := i.(watch.WatchEventModuleChanged); ok { - info := p.bctx.Load() - tx := watcher.GetTransaction(info.bctx.Config.Dir) - err := tx.Begin() - if err != nil { - logger.Errorf(err, "Failed to start watch transaction") - } - p.updates.Publish(AutoRebuildStartedEvent{Module: info.bctx.Config.Module}) - br, err := p.Build(ctx, info.projectConfig, info.stubsRoot, info.bctx, true) - if err != nil { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Err[BuildResult](err)}) - } else { - err = tx.ModifiedFiles(br.modifiedFiles...) - if err != nil { - if !br.redeployNotRequired { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Err[BuildResult](err)}) - } - } else { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Ok[BuildResult](br)}) - } - } - err = tx.End() - if err != nil { - logger.Errorf(err, "Failed to end watch transaction") - } - } - } } func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (buildResult BuildResult, err error) { @@ -327,7 +221,7 @@ func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (bu HotReloadEndpoint: optional.Ptr(buildSuccess.DevHotReloadEndpoint), HotReloadVersion: optional.Ptr(buildSuccess.DevHotReloadVersion), DebugPort: port, - modifiedFiles: buildSuccess.ModifiedFiles, + ModifiedFiles: buildSuccess.ModifiedFiles, redeployNotRequired: buildSuccess.RedeployNotRequired, }, nil case *langpb.BuildResponse_BuildFailure: @@ -350,17 +244,13 @@ func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (bu StartTime: startTime, Errors: errs, InvalidateDependencies: buildFailure.InvalidateDependencies, - modifiedFiles: buildFailure.ModifiedFiles, + ModifiedFiles: buildFailure.ModifiedFiles, }, nil default: panic(fmt.Sprintf("unexpected result type %T", result)) } } -func contextID(config moduleconfig.ModuleConfig, counter int) string { - return fmt.Sprintf("%v-%v", config.Module, counter) -} - type buildInfo struct { projectConfig projectconfig.Config stubsRoot string @@ -374,10 +264,11 @@ func (p *LanguagePlugin) watchForCmdError(ctx context.Context) { // closed return } - p.updates.Publish(PluginDiedEvent{ - Plugin: p, - Error: err, - }) + // TODO: handle this + // p.updates.Publish(PluginDiedEvent{ + // Plugin: p, + // Error: err, + // }) case <-ctx.Done(): diff --git a/internal/watch/filehash.go b/internal/watch/filehash.go index 78b32847e4..c2668a5b69 100644 --- a/internal/watch/filehash.go +++ b/internal/watch/filehash.go @@ -78,9 +78,6 @@ func ComputeFileHashes(dir string, skipGitIgnoredFiles bool, patterns []string) return errors.WithStack(err) } if !matched { - if patterns[0] == "*" { - return errors.Errorf("file %s:%s does not match any: %s", rootDir, srcPath, patterns) - } return nil } fileHashes[srcPath] = hash diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 6772672370..4801f3a937 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -287,6 +287,9 @@ func (t *modifyFilesTransaction) ModifiedFiles(paths ...string) error { if !t.isActive { return errors.Errorf("can not modify file because transaction is not active: %v", paths) } + if len(paths) == 0 { + return nil + } t.watcher.mutex.Lock() defer t.watcher.mutex.Unlock()