Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions changelog/spawn-by-ref.dd
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Added spawnRef and spawnLinkedRef for passing by reference through spawn

Now it will be possible to pass variables to the thread created with spawnRef/spawnLinkedRef by reference,
to do this you need the shared type and add ref to the attributes of the arguments of the function to be executed.

---
import std.concurrency : spawnRef;
import core.atomic : atomicOp;
import core.thread : thread_joinAll;

static void f1(ref shared(int) number)
{
atomicOp!"+="(number, 1);
}

shared(int) number = 10;
spawnRef(&f1, number);
thread_joinAll();
assert(number == 11);
---
143 changes: 139 additions & 4 deletions std/concurrency.d
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,30 @@ private
static assert(!hasLocalAliasing!(SysTime, Container));
}

bool hasLocalReferencePassing(F, Types...)()
{
bool doesIt = false;
static foreach (i, T; Types)
{
static if (!is(T == shared) && !is(T == Tid))
{
static if (ParameterStorageClassTuple!F[i] == ParameterStorageClass.ref_)
{
doesIt = true;
}
}
}

return doesIt;
}

@safe unittest
{
static void fun(ref int) {}
static assert(hasLocalReferencePassing!(typeof(fun), int));
static assert(!hasLocalReferencePassing!(typeof(fun), shared(int)));
}

enum MsgType
{
standard,
Expand Down Expand Up @@ -539,11 +563,105 @@ if (isSpawnable!(F, T))
return _spawn(true, fn, args);
}

/*
/**
* Starts fn(args) in a new logical thread.
*
* Executes the supplied function in a new logical thread represented by
* `Tid`. The calling thread is designated as the owner of the new thread.
* When the owner thread terminates an `OwnerTerminated` message will be
* sent to the new thread, causing an `OwnerTerminated` exception to be
* thrown on `receive()`.
* Unlike spawn, spawnRef passes all arguments by reference to the function
* to be executed.
* If the function to be executed contains parameters with the "ref" attribute
* then the corresponding arguments passed to spawnRef must be "shared".
*
*
* Params:
* fn = The function to execute.
* args = Arguments to the function.
*
* Returns:
* A Tid representing the new logical thread.
*
* Notes:
* `args` must not have unshared aliasing. In other words, all arguments
* to `fn` must either be `shared` or `immutable` or have no
* pointer indirection. This is necessary for enforcing isolation among
* threads.
*/
private Tid _spawn(F, T...)(bool linked, F fn, T args)
Tid spawnRef(F, T...)(F fn, ref T args)
if (isSpawnable!(F, T))
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
static assert(!hasLocalReferencePassing!(F, T), "Passing by reference thread-local data is not allowed.");
return _spawnRef(false, fn, args);
}

/**
* Starts fn(args) in a logical thread and will receive a LinkTerminated
* message when the operation terminates.
*
* Executes the supplied function in a new logical thread represented by
* Tid. This new thread is linked to the calling thread so that if either
* it or the calling thread terminates a LinkTerminated message will be sent
* to the other, causing a LinkTerminated exception to be thrown on receive().
* The owner relationship from spawn() is preserved as well, so if the link
* between threads is broken, owner termination will still result in an
* OwnerTerminated exception to be thrown on receive().
* Unlike spawnLinked, spawnLinkedRef passes all arguments by reference to the
* function to be executed.
* If the function to be executed contains parameters with the "ref" attribute
* then the corresponding arguments passed to spawnRef must be "shared".
*
* Params:
* fn = The function to execute.
* args = Arguments to the function.
*
* Returns:
* A Tid representing the new thread.
*/
Tid spawnLinkedRef(F, T...)(F fn, ref T args)
if (isSpawnable!(F, T))
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
static assert(!hasLocalReferencePassing!(F, T), "Passing by reference thread-local data is not allowed.");
return _spawnRef(true, fn, args);
}

@system unittest
{
import core.atomic : atomicOp;
import core.thread : thread_joinAll;

static void f1(ref shared(int) number)
{
atomicOp!"+="(number, 1);
}

shared(int) number = 10;
spawnRef(&f1, number);
thread_joinAll();
assert(number == 11);
}

@system unittest
{
import core.atomic : atomicOp;

static void f1(ref shared(int) number)
{
atomicOp!"+="(number, 1);
}

int number = 10;
static assert(!__traits(compiles, spawnRef(&f1, number)));
}

/*
*
*/
private template implSpawnExec()
{
// TODO: MessageList and &exec should be shared.
auto spawnTid = Tid(new MessageBox);
Expand All @@ -555,19 +673,36 @@ if (isSpawnable!(F, T))
thisInfo.owner = ownerTid;
fn(args);
}
}

private Tid implSpawn(void delegate() execFun, Tid spawnTid, bool linked) {
// TODO: MessageList and &exec should be shared.
if (scheduler !is null)
scheduler.spawn(&exec);
scheduler.spawn(execFun);
else
{
auto t = new Thread(&exec);
auto t = new Thread(execFun);
t.start();
}
thisInfo.links[spawnTid] = linked;
return spawnTid;
}


private Tid _spawn(F, T...)(bool linked, F fn, T args)
if (isSpawnable!(F, T))
{
mixin implSpawnExec;
return implSpawn(&exec, spawnTid, linked);
}

private Tid _spawnRef(F, T...)(bool linked, F fn, ref T args)
if (isSpawnable!(F, T))
{
mixin implSpawnExec;
return implSpawn(&exec, spawnTid, linked);
}

@system unittest
{
void function() fn1;
Expand Down