Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions src/MongoDB.Driver/Core/Bindings/ChannelChannelSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ internal sealed class ChannelChannelSource : IChannelSource
private readonly IChannelHandle _channel;
private bool _disposed;
private readonly IServer _server;
private readonly ICoreSessionHandle _session;

// constructors
public ChannelChannelSource(IServer server, IChannelHandle channel, ICoreSessionHandle session)
public ChannelChannelSource(IServer server, IChannelHandle channel)
{
_server = Ensure.IsNotNull(server, nameof(server));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_session = Ensure.IsNotNull(session, nameof(session));
}

// properties
Expand All @@ -47,18 +45,12 @@ public ServerDescription ServerDescription
get { return _server.Description; }
}

public ICoreSessionHandle Session
{
get { return _session; }
}

// methods
public void Dispose()
{
if (!_disposed)
{
_channel.Dispose();
_session.Dispose();
_disposed = true;
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/MongoDB.Driver/Core/Bindings/ChannelReadBinding.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2013-present MongoDB Inc.
/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,32 +27,24 @@ internal sealed class ChannelReadBinding : IReadBinding
private bool _disposed;
private readonly ReadPreference _readPreference;
private readonly IServer _server;
private readonly ICoreSessionHandle _session;

public ChannelReadBinding(IServer server, IChannelHandle channel, ReadPreference readPreference, ICoreSessionHandle session)
public ChannelReadBinding(IServer server, IChannelHandle channel, ReadPreference readPreference)
{
_server = Ensure.IsNotNull(server, nameof(server));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
_session = Ensure.IsNotNull(session, nameof(session));
}

public ReadPreference ReadPreference
{
get { return _readPreference; }
}

public ICoreSessionHandle Session
{
get { return _session; }
}

public void Dispose()
{
if (!_disposed)
{
_channel.Dispose();
_session.Dispose();
_disposed = true;
}
}
Expand Down Expand Up @@ -81,7 +73,7 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext ope

private IChannelSourceHandle GetReadChannelSourceHelper()
{
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork()));
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork()));
}

private void ThrowIfDisposed()
Expand Down
14 changes: 3 additions & 11 deletions src/MongoDB.Driver/Core/Bindings/ChannelReadWriteBinding.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2013-present MongoDB Inc.
/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,31 +26,23 @@ internal sealed class ChannelReadWriteBinding : IReadWriteBinding
private readonly IChannelHandle _channel;
private bool _disposed;
private readonly IServer _server;
private readonly ICoreSessionHandle _session;

public ChannelReadWriteBinding(IServer server, IChannelHandle channel, ICoreSessionHandle session)
public ChannelReadWriteBinding(IServer server, IChannelHandle channel)
{
_server = Ensure.IsNotNull(server, nameof(server));
_channel = Ensure.IsNotNull(channel, nameof(channel));
_session = Ensure.IsNotNull(session, nameof(session));
}

public ReadPreference ReadPreference
{
get { return ReadPreference.Primary; }
}

public ICoreSessionHandle Session
{
get { return _session; }
}

public void Dispose()
{
if (!_disposed)
{
_channel.Dispose();
_session.Dispose();
_disposed = true;
}
}
Expand Down Expand Up @@ -121,7 +113,7 @@ public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext op

private IChannelSourceHandle GetChannelSourceHelper()
{
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork()));
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork()));
}

private void ThrowIfDisposed()
Expand Down
7 changes: 1 addition & 6 deletions src/MongoDB.Driver/Core/Bindings/ChannelSourceHandle.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2013-present MongoDB Inc.
/* Copyright 2010-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,11 +48,6 @@ public ServerDescription ServerDescription
get { return _reference.Instance.ServerDescription; }
}

public ICoreSessionHandle Session
{
get { return _reference.Instance.Session; }
}

// methods
public IChannelHandle GetChannel(OperationContext operationContext)
{
Expand Down
40 changes: 33 additions & 7 deletions src/MongoDB.Driver/Core/Bindings/CoreSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, Canc
{
EnsureAbortTransactionCanBeCalled(nameof(AbortTransaction));

using var operationContext = new OperationContext(GetTimeout(options?.Timeout), "abortTransaction", "admin", null, _currentTransaction.IsTracingEnabled, cancellationToken);
using var sessionHandle = new NonDisposingCoreSessionHandle(this);
using var operationContext = new OperationContext(sessionHandle, GetTimeout(options?.Timeout), cancellationToken)
{
IsTracingEnabled = _currentTransaction.IsTracingEnabled,
OperationName = "abortTransaction",
DatabaseName = "admin",
CollectionName = null
};
try
{
if (_currentTransaction.IsEmpty)
Expand Down Expand Up @@ -189,7 +196,14 @@ async Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions op
{
EnsureAbortTransactionCanBeCalled(nameof(AbortTransaction));

using var operationContext = new OperationContext(GetTimeout(options?.Timeout), "abortTransaction", "admin", null, _currentTransaction.IsTracingEnabled, cancellationToken);
using var sessionHandle = new NonDisposingCoreSessionHandle(this);
using var operationContext = new OperationContext(sessionHandle, GetTimeout(options?.Timeout), cancellationToken)
{
IsTracingEnabled = _currentTransaction.IsTracingEnabled,
OperationName = "abortTransaction",
DatabaseName = "admin",
CollectionName = null
};
try
{
if (_currentTransaction.IsEmpty)
Expand Down Expand Up @@ -275,8 +289,14 @@ public void CommitTransaction(CancellationToken cancellationToken = default)
void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken)
{
EnsureCommitTransactionCanBeCalled(nameof(CommitTransaction));

using var operationContext = new OperationContext(GetTimeout(options?.Timeout), "commitTransaction", "admin", null, _currentTransaction.IsTracingEnabled, cancellationToken);
using var sessionHandle = new NonDisposingCoreSessionHandle(this);
using var operationContext = new OperationContext(sessionHandle, GetTimeout(options?.Timeout), cancellationToken)
{
IsTracingEnabled = _currentTransaction.IsTracingEnabled,
OperationName = "commitTransaction",
DatabaseName = "admin",
CollectionName = null
};
try
{
_isCommitTransactionInProgress = true;
Expand Down Expand Up @@ -304,8 +324,14 @@ public Task CommitTransactionAsync(CancellationToken cancellationToken = default
async Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken)
{
EnsureCommitTransactionCanBeCalled(nameof(CommitTransaction));

using var operationContext = new OperationContext(GetTimeout(options?.Timeout), "commitTransaction", "admin", null, _currentTransaction.IsTracingEnabled, cancellationToken);
using var sessionHandle = new NonDisposingCoreSessionHandle(this);
using var operationContext = new OperationContext(sessionHandle, GetTimeout(options?.Timeout), cancellationToken)
{
IsTracingEnabled = _currentTransaction.IsTracingEnabled,
OperationName = "commitTransaction",
DatabaseName = "admin",
CollectionName = null
};
try
{
_isCommitTransactionInProgress = true;
Expand Down Expand Up @@ -560,7 +586,7 @@ private IReadWriteBinding CreateEndTransactionBinding(ICoreSessionHandle session
// otherwise the captured pinned-channel fork goes stale after UnpinAll.
if (_cluster.Description.Type == ClusterType.LoadBalanced)
{
return new EndTransactionReadWriteBinding(_cluster, session);
return new EndTransactionReadWriteBinding(_cluster);
}

return ChannelPinningHelper.CreateReadWriteBinding(_cluster, session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,72 +31,110 @@ internal sealed class EndTransactionReadWriteBinding : IReadWriteBinding
#pragma warning restore CA2213 // Disposable fields should be disposed
private bool _disposed;
private IReadWriteBindingHandle _innerBinding;
private readonly ICoreSessionHandle _session;

public EndTransactionReadWriteBinding(IClusterInternal cluster, ICoreSessionHandle session)
public EndTransactionReadWriteBinding(IClusterInternal cluster)
{
_cluster = Ensure.IsNotNull(cluster, nameof(cluster));
_session = Ensure.IsNotNull(session, nameof(session));
_innerBinding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, _session.Fork());
}

public ReadPreference ReadPreference => ReadPreference.Primary;

public ICoreSessionHandle Session => _session;

// Called by EndTransactionOperation.OnRetry between attempts.
public void RebuildInnerBinding()
{
ThrowIfDisposed();
_innerBinding.Dispose();
_innerBinding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, _session.Fork());
_innerBinding?.Dispose();
_innerBinding = null;
}

public void Dispose()
{
if (!_disposed)
{
_innerBinding.Dispose();
_session.Dispose();
_innerBinding?.Dispose();
_disposed = true;
}
}

public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext)
=> _innerBinding.GetReadChannelSource(operationContext);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetReadChannelSource(operationContext);
}

public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext)
=> _innerBinding.GetReadChannelSourceAsync(operationContext);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetReadChannelSourceAsync(operationContext);
}

public IChannelSourceHandle GetReadChannelSource(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
=> _innerBinding.GetReadChannelSource(operationContext, deprioritizedServers);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetReadChannelSource(operationContext, deprioritizedServers);
}

public Task<IChannelSourceHandle> GetReadChannelSourceAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
=> _innerBinding.GetReadChannelSourceAsync(operationContext, deprioritizedServers);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetReadChannelSourceAsync(operationContext, deprioritizedServers);
}

public IChannelSourceHandle GetWriteChannelSource(OperationContext operationContext)
=> _innerBinding.GetWriteChannelSource(operationContext);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSource(operationContext);
}

public IChannelSourceHandle GetWriteChannelSource(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
=> _innerBinding.GetWriteChannelSource(operationContext, deprioritizedServers);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSource(operationContext, deprioritizedServers);
}

public IChannelSourceHandle GetWriteChannelSource(OperationContext operationContext, IMayUseSecondaryCriteria mayUseSecondary)
=> _innerBinding.GetWriteChannelSource(operationContext, mayUseSecondary);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSource(operationContext, mayUseSecondary);
}

public IChannelSourceHandle GetWriteChannelSource(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary)
=> _innerBinding.GetWriteChannelSource(operationContext, deprioritizedServers, mayUseSecondary);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSource(operationContext, deprioritizedServers, mayUseSecondary);
}

public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext operationContext)
=> _innerBinding.GetWriteChannelSourceAsync(operationContext);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSourceAsync(operationContext);
}

public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers)
=> _innerBinding.GetWriteChannelSourceAsync(operationContext, deprioritizedServers);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSourceAsync(operationContext, deprioritizedServers);
}

public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext operationContext, IMayUseSecondaryCriteria mayUseSecondary)
=> _innerBinding.GetWriteChannelSourceAsync(operationContext, mayUseSecondary);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSourceAsync(operationContext, mayUseSecondary);
}

public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(OperationContext operationContext, IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary)
=> _innerBinding.GetWriteChannelSourceAsync(operationContext, deprioritizedServers, mayUseSecondary);
{
EnsureInnerBinding(operationContext);
return _innerBinding.GetWriteChannelSourceAsync(operationContext, deprioritizedServers, mayUseSecondary);
}

private void EnsureInnerBinding(OperationContext operationContext)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we worry about thread safety for this?

{
if (_innerBinding == null)
{
_innerBinding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, operationContext.Session);
}
}

private void ThrowIfDisposed()
{
Expand Down
1 change: 0 additions & 1 deletion src/MongoDB.Driver/Core/Bindings/IBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ namespace MongoDB.Driver.Core.Bindings
{
internal interface IBinding : IDisposable
{
ICoreSessionHandle Session { get; }
}

internal interface IReadBinding : IBinding
Expand Down
2 changes: 0 additions & 2 deletions src/MongoDB.Driver/Core/Bindings/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ internal interface IChannel : IDisposable

TResult Command<TResult>(
OperationContext operationContext,
ICoreSession session,
ReadPreference readPreference,
DatabaseNamespace databaseNamespace,
BsonDocument command,
Expand All @@ -47,7 +46,6 @@ TResult Command<TResult>(

Task<TResult> CommandAsync<TResult>(
OperationContext operationContext,
ICoreSession session,
ReadPreference readPreference,
DatabaseNamespace databaseNamespace,
BsonDocument command,
Expand Down
1 change: 0 additions & 1 deletion src/MongoDB.Driver/Core/Bindings/IChannelSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ internal interface IChannelSource : IDisposable
{
IServer Server { get; }
ServerDescription ServerDescription { get; }
ICoreSessionHandle Session { get; }

IChannelHandle GetChannel(OperationContext operationContext);
Task<IChannelHandle> GetChannelAsync(OperationContext operationContext);
Expand Down
Loading