Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/PTO_IR_manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -9371,6 +9371,16 @@ pto.comm.tget(%dst, %src, buf(%ping, %pong) : !pto.partition_tensor_view<128xf32
- `signal` must be a GM-shaped value with element type `i32`.
- `value` / `cmpValue` must be signless integer scalars.

**Lowering ordering guarantee:**

- `pto.comm.tnotify` is lowered with a `pipe_barrier(PIPE_ALL)` emitted
immediately before the `pto::comm::TNOTIFY(...)` call. `TNOTIFY_IMPL` writes
the signal on the scalar pipe and only issues its trailing barrier *after*
the store, so this preceding drain is what makes the
`peer_TWAIT_returns ⇒ everything I issued before my TNOTIFY is visible`
contract hold across `pto.tload` / `pto.tstore` (local or peer-addressed).
Callers do not need to insert manual sync.

**Examples:**

```mlir
Expand Down
17 changes: 17 additions & 0 deletions lib/PTO/Transforms/PTOToEmitC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6387,6 +6387,20 @@ static std::string notifyOpTok(pto::NotifyOp op) {
return "pto::comm::NotifyOp::Set";
}

// Issue #711: TNOTIFY writes its signal on the scalar pipe, and
// TNOTIFY_IMPL's trailing pipe_barrier(PIPE_ALL) runs *after* that store.
// If any prior pto.tload / pto.tstore (local or peer) is still in flight on
// an MTE pipe when the signal lands, the receiver's matching TWAIT can
// return before the data is visible. The lowering of pto.comm.tnotify must
// drain MTE-side pipes itself; callers cannot be required to insert sync.
static void emitTNotifyMteDrain(ConversionPatternRewriter &rewriter,
Location loc) {
auto *ctx = rewriter.getContext();
auto args = rewriter.getArrayAttr({emitc::OpaqueAttr::get(ctx, "PIPE_ALL")});
rewriter.create<emitc::CallOpaqueOp>(loc, TypeRange{}, "pipe_barrier", args,
ArrayAttr{}, ValueRange{});
}

static std::string waitCmpTok(pto::WaitCmp cmp) {
switch (cmp) {
case pto::WaitCmp::EQ:
Expand Down Expand Up @@ -6641,6 +6655,9 @@ struct PTOSignalCommToEmitC : public OpConversionPattern<SignalOp> {
rewriter, op.getLoc(), notifyTy, notifyOpTok(op.getNotifyOp()));
SmallVector<Value> operands{*signalGT, peelUnrealized(adaptor.getValue()),
notifyOp};
// See emitTNotifyMteDrain comment: drain in-flight MTE work before the
// scalar-pipe signal store so the notify/wait handshake is honored.
emitTNotifyMteDrain(rewriter, op.getLoc());
rewriter.create<emitc::CallOpaqueOp>(op.getLoc(), TypeRange{}, callee,
ArrayAttr{}, ArrayAttr{}, operands);
rewriter.eraseOp(op);
Expand Down
129 changes: 129 additions & 0 deletions test/lit/pto/issue711_tnotify_mte_drain.pto
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2026 Huawei Technologies Co., Ltd.
// This program is free software, you can redistribute it and/or modify it under the terms and conditions of
// CANN Open Software License Agreement Version 2.0 (the "License").
// Please refer to the License for details. You may not use this file except in compliance with the License.
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
// INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
// See LICENSE in the root of the software repository for the full text of the License.

// Regression for issue #711: pto.comm.tnotify lowering must drain MTE-side
// pipes before emitting pto::comm::TNOTIFY(...). TNOTIFY_IMPL writes the
// signal on the scalar pipe and only does pipe_barrier(PIPE_ALL) *after* the
// store, so without an MTE drain the signal can overtake an in-flight
// pto.tload / pto.tstore (local or peer) and the receiver's matching TWAIT
// returns before the data is visible.

// RUN: ptoas --pto-arch=a3 %s -o - 2>&1 | FileCheck %s

module {
// tstore -> tnotify: the data-producing case (signal must follow the store).
func.func @tnotify_drain_after_tstore(
%src_ptr: !pto.ptr<f32>,
%dst_ptr: !pto.ptr<f32>,
%signal_ptr: !pto.ptr<i32>)
attributes {pto.kernel_kind = #pto.kernel_kind<vector>} {
%c0 = arith.constant 0 : index
%c1 = arith.constant 1 : index
%c32 = arith.constant 32 : index
%v_i32 = arith.constant 1 : i32

%tile = pto.alloc_tile :
!pto.tile_buf<loc=vec, dtype=f32, rows=1, cols=32, v_row=1, v_col=32, blayout=row_major, slayout=none_box, fractal=512, pad=0>

%src_view = pto.make_tensor_view %src_ptr,
shape = [%c1, %c32], strides = [%c32, %c1] : !pto.tensor_view<?x?xf32>
%src = pto.partition_view %src_view,
offsets = [%c0, %c0], sizes = [%c1, %c32]
: !pto.tensor_view<?x?xf32> -> !pto.partition_tensor_view<1x32xf32>

%dst_view = pto.make_tensor_view %dst_ptr,
shape = [%c1, %c32], strides = [%c32, %c1] : !pto.tensor_view<?x?xf32>
%dst = pto.partition_view %dst_view,
offsets = [%c0, %c0], sizes = [%c1, %c32]
: !pto.tensor_view<?x?xf32> -> !pto.partition_tensor_view<1x32xf32>

pto.tload ins(%src : !pto.partition_tensor_view<1x32xf32>)
outs(%tile : !pto.tile_buf<loc=vec, dtype=f32, rows=1, cols=32, v_row=1, v_col=32, blayout=row_major, slayout=none_box, fractal=512, pad=0>)
pto.tstore ins(%tile : !pto.tile_buf<loc=vec, dtype=f32, rows=1, cols=32, v_row=1, v_col=32, blayout=row_major, slayout=none_box, fractal=512, pad=0>)
outs(%dst : !pto.partition_tensor_view<1x32xf32>)

%sig_view = pto.make_tensor_view %signal_ptr,
shape = [%c1], strides = [%c1] : !pto.tensor_view<1xi32>
%sig = pto.partition_view %sig_view,
offsets = [%c0], sizes = [%c1]
: !pto.tensor_view<1xi32> -> !pto.partition_tensor_view<1xi32>
pto.comm.tnotify(%sig, %v_i32 : !pto.partition_tensor_view<1xi32>, i32)
{notifyOp = #pto<notify_op set>}
return
}

// tload -> tnotify: the input-consumed case (notify must follow the load
// so the producer can reuse the source buffer once TWAIT returns).
func.func @tnotify_drain_after_tload(
%src_ptr: !pto.ptr<f32>,
%signal_ptr: !pto.ptr<i32>)
attributes {pto.kernel_kind = #pto.kernel_kind<vector>} {
%c0 = arith.constant 0 : index
%c1 = arith.constant 1 : index
%c32 = arith.constant 32 : index
%v_i32 = arith.constant 1 : i32

%tile = pto.alloc_tile :
!pto.tile_buf<loc=vec, dtype=f32, rows=1, cols=32, v_row=1, v_col=32, blayout=row_major, slayout=none_box, fractal=512, pad=0>

%src_view = pto.make_tensor_view %src_ptr,
shape = [%c1, %c32], strides = [%c32, %c1] : !pto.tensor_view<?x?xf32>
%src = pto.partition_view %src_view,
offsets = [%c0, %c0], sizes = [%c1, %c32]
: !pto.tensor_view<?x?xf32> -> !pto.partition_tensor_view<1x32xf32>

pto.tload ins(%src : !pto.partition_tensor_view<1x32xf32>)
outs(%tile : !pto.tile_buf<loc=vec, dtype=f32, rows=1, cols=32, v_row=1, v_col=32, blayout=row_major, slayout=none_box, fractal=512, pad=0>)

%sig_view = pto.make_tensor_view %signal_ptr,
shape = [%c1], strides = [%c1] : !pto.tensor_view<1xi32>
%sig = pto.partition_view %sig_view,
offsets = [%c0], sizes = [%c1]
: !pto.tensor_view<1xi32> -> !pto.partition_tensor_view<1xi32>
pto.comm.tnotify(%sig, %v_i32 : !pto.partition_tensor_view<1xi32>, i32)
{notifyOp = #pto<notify_op atomic_add>}
return
}

// twait must NOT receive the new MTE drain -- the receiver-side wait does
// not write any peer-visible data, so adding a barrier there would only be
// a needless stall. Pair this with --implicit-check-not to lock that in.
func.func @twait_no_extra_drain(
%signal_ptr: !pto.ptr<i32>)
attributes {pto.kernel_kind = #pto.kernel_kind<vector>} {
%c0 = arith.constant 0 : index
%c1 = arith.constant 1 : index
%v_i32 = arith.constant 1 : i32

%sig_view = pto.make_tensor_view %signal_ptr,
shape = [%c1], strides = [%c1] : !pto.tensor_view<1xi32>
%sig = pto.partition_view %sig_view,
offsets = [%c0], sizes = [%c1]
: !pto.tensor_view<1xi32> -> !pto.partition_tensor_view<1xi32>
pto.comm.twait(%sig, %v_i32 : !pto.partition_tensor_view<1xi32>, i32)
{cmp = #pto<wait_cmp ge>}
return
}
}

// CHECK-LABEL: AICORE void tnotify_drain_after_tstore(
// CHECK: pto::comm::NotifyOp{{.*}}= pto::comm::NotifyOp::Set;
// CHECK: TLOAD(
// CHECK: TSTORE(
// CHECK: pipe_barrier(PIPE_ALL);
// CHECK-NEXT: pto::comm::TNOTIFY(

// CHECK-LABEL: AICORE void tnotify_drain_after_tload(
// CHECK: pto::comm::NotifyOp{{.*}}= pto::comm::NotifyOp::AtomicAdd;
// CHECK: TLOAD(
// CHECK: pipe_barrier(PIPE_ALL);
// CHECK-NEXT: pto::comm::TNOTIFY(

// CHECK-LABEL: AICORE void twait_no_extra_drain(
// CHECK-NOT: pipe_barrier(PIPE_ALL);
// CHECK: pto::comm::TWAIT(
Loading