1515 */
1616package org .lable .oss .uniqueid .etcd ;
1717
18- import io .etcd .jetcd .*;
18+ import io .etcd .jetcd .ByteSequence ;
19+ import io .etcd .jetcd .Client ;
20+ import io .etcd .jetcd .KeyValue ;
21+ import io .etcd .jetcd .Watch ;
1922import io .etcd .jetcd .kv .GetResponse ;
2023import io .etcd .jetcd .kv .TxnResponse ;
2124import io .etcd .jetcd .lease .LeaseGrantResponse ;
@@ -55,31 +58,35 @@ public class RegistryBasedResourceClaim {
5558
5659 final int poolSize ;
5760
58- RegistryBasedResourceClaim (Supplier <Client > connectToEtcd , int maxGeneratorCount , String registryEntry )
61+ RegistryBasedResourceClaim (Supplier <Client > connectToEtcd ,
62+ int maxGeneratorCount ,
63+ String registryEntry ,
64+ Duration acquisitionTimeout ,
65+ boolean waitWhenNoResourcesAvailable )
5966 throws IOException {
6067 this .registryEntry = registryEntry ;
6168 this .connectToEtcd = connectToEtcd ;
6269
63- logger .debug ("Acquiring resource-claim…" );
70+ logger .info ("Acquiring resource-claim…" );
6471
6572 Client etcd = connectToEtcd .get ();
6673
6774 List <Integer > clusterIds = ClusterID .get (etcd );
6875
69- Duration timeout = Duration .ofMinutes (5 );
76+ Duration timeout = acquisitionTimeout == null
77+ ? Duration .ofMinutes (5 )
78+ : acquisitionTimeout ;
7079 Instant giveUpAfter = Instant .now ().plus (timeout );
7180
7281 this .poolSize = maxGeneratorCount ;
7382
83+ ResourcePair resourcePair = null ;
7484 try {
75- LeaseGrantResponse lease = etcd .getLeaseClient ().grant (5 ).get ();
85+ logger .debug ("Acquiring lock, timeout is set to {}." , timeout );
86+ // Have the lease TTL just a bit after our timeout.
87+ LeaseGrantResponse lease = etcd .getLeaseClient ().grant (timeout .plusSeconds (5 ).getSeconds ()).get ();
7688 long leaseId = lease .getID ();
7789
78- // Keep the lease alive until we are done.
79- CloseableClient leaseKeepAlive = EtcdHelper .keepLeaseAlive (etcd , leaseId , null );
80-
81- // Release the lease when closed.
82-
8390 // Acquire the lock. This makes sure we are the only process claiming a resource.
8491 LockResponse lock ;
8592 try {
@@ -90,47 +97,80 @@ public class RegistryBasedResourceClaim {
9097 throw new IOException ("Process timed out." );
9198 }
9299
100+ // Keep the lease alive for another period in order to safely finish claiming the resource.
101+ etcd .getLeaseClient ().keepAliveOnce (leaseId ).get ();
102+
93103 if (logger .isDebugEnabled ()) {
94104 logger .debug ("Acquired lock: {}." , lock .getKey ().toString (StandardCharsets .UTF_8 ));
95105 }
96106
97- ResourcePair resourcePair = claimResource (etcd , maxGeneratorCount , clusterIds , giveUpAfter );
107+ resourcePair = claimResource (
108+ etcd , maxGeneratorCount , clusterIds , giveUpAfter , waitWhenNoResourcesAvailable
109+ );
98110 this .clusterId = resourcePair .clusterId ;
99111 this .generatorId = resourcePair .generatorId ;
100112
101- // Release the lock. If this line is not reached due to exceptions raised, the lock will automatically
102- // be removed when the lease holding it expires.
113+ // Explicitly release the lock. If this line is not reached due to exceptions raised, the lock will
114+ // automatically be removed when the lease holding it expires.
103115 etcd .getLockClient ().unlock (lock .getKey ()).get ();
116+ if (logger .isDebugEnabled ()) {
117+ logger .debug ("Released lock: {}." , lock .getKey ().toString (StandardCharsets .UTF_8 ));
118+ }
104119
105- leaseKeepAlive .close ();
120+ // Revoke the lease instead of letting it time out.
121+ etcd .getLeaseClient ().revoke (leaseId ).get ();
106122 } catch (ExecutionException e ) {
107- close ();
123+ if (resourcePair != null ) {
124+ close ();
125+ }
108126 throw new IOException (e );
109127 } catch (InterruptedException e ) {
110- close ();
128+ if (resourcePair != null ) {
129+ close ();
130+ }
111131 Thread .currentThread ().interrupt ();
112132 throw new IOException (e );
113133 }
114134
115135 logger .debug ("Resource-claim acquired ({}/{})." , clusterId , generatorId );
116136 }
117137
138+ /**
139+ * Claim a resource.
140+ *
141+ * @param connectToEtcd Provide a connection to Etcd.
142+ * @param maxGeneratorCount Maximum number of generators possible.
143+ * @param registryEntry Metadata stored under the Etcd key.
144+ * @param acquisitionTimeout Abort attempt to claim a resource after this duration.
145+ * @param waitWhenNoResourcesAvailable Wait for a resource to become available when all resources are claimed.
146+ * @return The resource claim, if successful.
147+ * @throws IOException Thrown when the claim could not be acquired.
148+ */
118149 public static RegistryBasedResourceClaim claim (Supplier <Client > connectToEtcd ,
119150 int maxGeneratorCount ,
120- String registryEntry ) throws IOException {
121- return new RegistryBasedResourceClaim (connectToEtcd , maxGeneratorCount , registryEntry );
151+ String registryEntry ,
152+ Duration acquisitionTimeout ,
153+ boolean waitWhenNoResourcesAvailable ) throws IOException {
154+ return new RegistryBasedResourceClaim (
155+ connectToEtcd , maxGeneratorCount , registryEntry , acquisitionTimeout , waitWhenNoResourcesAvailable
156+ );
122157 }
123158
124159 /**
125160 * Try to claim an available resource from the resource pool.
126161 *
127- * @param etcd Etcd connection.
128- * @param maxGeneratorCount Maximum number of generators possible.
129- * @param clusterIds Cluster Ids available to use.
130- * @param giveUpAfter Give up after this instant in time.
162+ * @param etcd Etcd connection.
163+ * @param maxGeneratorCount Maximum number of generators possible.
164+ * @param clusterIds Cluster Ids available to use.
165+ * @param giveUpAfter Give up after this instant in time.
166+ * @param waitWhenNoResourcesAvailable Wait for a resource to become available when all resources are claimed.
131167 * @return The claimed resource.
132168 */
133- ResourcePair claimResource (Client etcd , int maxGeneratorCount , List <Integer > clusterIds , Instant giveUpAfter )
169+ ResourcePair claimResource (Client etcd ,
170+ int maxGeneratorCount ,
171+ List <Integer > clusterIds ,
172+ Instant giveUpAfter ,
173+ boolean waitWhenNoResourcesAvailable )
134174 throws InterruptedException , IOException , ExecutionException {
135175
136176 logger .debug ("Trying to claim a resource." );
@@ -148,7 +188,12 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
148188 .collect (Collectors .toList ());
149189
150190 if (claimedResources .size () >= registrySize ) {
151- logger .debug ("No resources available at the moment (registry size: {}), waiting." , registrySize );
191+ if (!waitWhenNoResourcesAvailable ) {
192+ throw new IOException (
193+ "No resources available. Giving up as requested. Registry size: " + registrySize + "."
194+ );
195+ }
196+ logger .warn ("No resources available at the moment (registry size: {}), waiting." , registrySize );
152197 // No resources available. Wait for a resource to become available.
153198 final CountDownLatch latch = new CountDownLatch (1 );
154199 Watch .Watcher watcher = etcd .getWatchClient ().watch (
@@ -158,7 +203,7 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
158203 );
159204 awaitLatchUnlessItTakesTooLong (latch , giveUpAfter );
160205 watcher .close ();
161- return claimResource (etcd , maxGeneratorCount , clusterIds , giveUpAfter );
206+ return claimResource (etcd , maxGeneratorCount , clusterIds , giveUpAfter , true );
162207 }
163208
164209 // Try to claim an available resource.
@@ -185,13 +230,13 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
185230 continue ;
186231 }
187232
188- logger .debug ("Successfully claimed resource {}." , resourcePathString );
233+ logger .info ("Successfully claimed resource {}." , resourcePathString );
189234 return new ResourcePair (clusterId , generatorId );
190235 }
191236 }
192237 }
193238
194- return claimResource (etcd , maxGeneratorCount , clusterIds , giveUpAfter );
239+ return claimResource (etcd , maxGeneratorCount , clusterIds , giveUpAfter , waitWhenNoResourcesAvailable );
195240 }
196241
197242 static String resourceKey (Integer clusterId , int generatorId ) {
0 commit comments