Skip to content
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python_Versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,13 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
go func() {
m.mu.Lock()
defer m.mu.Unlock()
if curr, ok := m.ports[port.URL]; ok && curr == ch {
delete(m.ports, port.URL)
}
}()
}
m.ports[port.URL] = ch
return ch, nil
Expand Down
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo

// Previous context cancelled so we need a new one
// for this request.
pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
_, err = pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
if err != nil {
slog.Warn("StopWorker failed", "worker", wk, "error", err)
}
wk.Stop()
}

Expand Down
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,16 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
// Log a heartbeat every 60 seconds
case <-ticker.C:
j.Logger.Info("pipeline is running", slog.String("job", j.String()))
j.Logger.Info("pipeline stages state", slog.String("stages", em.DumpStages()))
for envID, wk := range wks {
if wk != nil && wk.Connected() && !wk.Stopped() {
j.Logger.Info("worker status",
slog.String("workerID", wk.ID),
slog.String("envID", envID),
slog.Duration("uptime", wk.Uptime()),
slog.Any("active_bundles", wk.ActiveBundles()))
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *runner) handleReshuffle(tid string, t *pipepb.PTransform, comps *pipepb
}

// And all the sub transforms.
toRemove = append(toRemove, t.GetSubtransforms()...)
toRemove = append(toRemove, removeSubTransforms(comps, t.GetSubtransforms())...)

// Return the new components which is the transforms consumer
return prepareResult{
Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
panic(err)
}

bundleStart := time.Now()

// Progress + split loop.
previousIndex := int64(-2)
previousTotalCount := int64(-2) // Total count of all pcollection elements.
Expand Down Expand Up @@ -232,7 +234,11 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
runningFor := time.Since(bundleStart)
slog.Debug("progress report", "bundle", rb, "runningFor", runningFor, "index", index, "prevIndex", previousIndex)
if runningFor > 5*time.Minute {
slog.Warn("Bundle has been running for a long time", "bundle", rb, "runningFor", runningFor, "worker", wk.ID)
}

// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
Expand Down
27 changes: 27 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type W struct {
// These are the ID sources
inst uint64
connected, stopped atomic.Bool
StartTime time.Time
StoppedChan chan struct{} // Channel to Broadcast stopped state.

InstReqs chan *fnpb.InstructionRequest
Expand Down Expand Up @@ -292,11 +293,37 @@ func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Uptime returns how long the worker has been connected.
func (wk *W) Uptime() time.Duration {
wk.mu.Lock()
defer wk.mu.Unlock()
if wk.StartTime.IsZero() {
return 0
}
return time.Since(wk.StartTime)
}

// ActiveBundles returns a list of active bundles currently processing on this worker.
func (wk *W) ActiveBundles() []string {
wk.mu.Lock()
defer wk.mu.Unlock()
var bundles []string
for id, responder := range wk.activeInstructions {
if b, ok := responder.(*B); ok {
bundles = append(bundles, fmt.Sprintf("%s (%s)", id, b.PBDID))
}
}
return bundles
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
wk.mu.Lock()
wk.StartTime = time.Now()
wk.mu.Unlock()
done := make(chan error, 1)
go func() {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def main(argv):
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):

# Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback
# connections on dual-stack (IPv4/IPv6) systems.
address = 'localhost:{}'.format(known_args.port)
address = '0.0.0.0:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
Expand All @@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
# Ensure gRPC server successfully binds. If this fails (e.g., due to port collision),
# add_insecure_port returns 0. We raise an error to crash the subprocess immediately,
# allowing the parent process to detect it and fail fast rather than hanging.
bound_port = server.add_insecure_port(address)
if not bound_port:
raise RuntimeError(
"Failed to bind expansion service to {}".format(address))
server.add_insecure_port(address)
server.start()
_LOGGER.info('Listening for expansion requests at %d', bound_port)
_LOGGER.info('Listening for expansion requests at %d', known_args.port)

def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,11 @@ def _get_platform_for_default_sdk_container():
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = distribution('pip').version
if version.parse(pip_version) >= version.parse('19.3'):
# pip can only recognize manylinux2014_x86_64 wheels
# from version 19.3.
# See more information about manylinux at
# https://github.com/pypa/manylinux
if version.parse(pip_version) >= version.parse('20.3'):
return 'manylinux_2_28_x86_64'
elif version.parse(pip_version) >= version.parse('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'
Expand Down Expand Up @@ -795,7 +797,7 @@ def _populate_requirements_cache(
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
processes.check_call(cmd_args)

# Get list of downloaded packages and copy them to the cache
downloaded_packages = set()
Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,13 @@ def test_populate_requirements_cache_uses_find_links(self):

captured_cmd_args = []

def mock_check_output(cmd_args, **kwargs):
def mock_check_call(cmd_args, **kwargs):
captured_cmd_args.extend(cmd_args)
return b''
return 0

with mock.patch(
'apache_beam.runners.portability.stager.processes.check_output',
side_effect=mock_check_output):
'apache_beam.runners.portability.stager.processes.check_call',
side_effect=mock_check_call):
stager.Stager._populate_requirements_cache(
requirements_file, requirements_cache_dir)

Expand Down
Loading
Loading