-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: scale up connection worker pool based on latency #13384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -1881,12 +1881,25 @@ void setRequestSendQueueTime() { | |||||
|
|
||||||
| /** Returns the current workload of this worker. */ | ||||||
| public Load getLoad() { | ||||||
| return Load.create( | ||||||
| inflightBytes, | ||||||
| inflightRequests, | ||||||
| destinationSet.size(), | ||||||
| maxInflightBytes, | ||||||
| maxInflightRequests); | ||||||
| this.lock.lock(); | ||||||
| try { | ||||||
| Duration timeSinceLastCallback = Duration.ZERO; | ||||||
| if (!inflightRequestQueue.isEmpty()) { | ||||||
| AppendRequestAndResponse head = inflightRequestQueue.peekFirst(); | ||||||
| if (head != null && head.requestSendTimeStamp != null) { | ||||||
| timeSinceLastCallback = Duration.between(head.requestSendTimeStamp, Instant.now()); | ||||||
| } | ||||||
| } | ||||||
| return Load.create( | ||||||
| timeSinceLastCallback, | ||||||
| inflightBytes, | ||||||
| inflightRequests, | ||||||
| destinationSet.size(), | ||||||
| maxInflightBytes, | ||||||
| maxInflightRequests); | ||||||
| } finally { | ||||||
| this.lock.unlock(); | ||||||
| } | ||||||
|
Comment on lines
+1884
to
+1902
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acquiring To avoid this, we can maintain a Instant oldestTimestamp = this.oldestInflightRequestSendTimeStamp;
Duration timeSinceLastCallback = Duration.ZERO;
if (oldestTimestamp != null) {
timeSinceLastCallback = Duration.between(oldestTimestamp, Instant.now());
}
return Load.create(
timeSinceLastCallback,
inflightBytes,
inflightRequests,
destinationSet.size(),
maxInflightBytes,
maxInflightRequests);References
|
||||||
| } | ||||||
|
|
||||||
| /** | ||||||
|
|
@@ -1896,11 +1909,15 @@ public Load getLoad() { | |||||
| @AutoValue | ||||||
| public abstract static class Load { | ||||||
|
|
||||||
| // Consider the load on this worker to be overwhelmed when above some percentage of | ||||||
| // in-flight bytes or in-flight requests count. | ||||||
| // Consider the load on this worker to be overwhelmed when above some inflight latency or | ||||||
| // percentage of in-flight bytes or in-flight requests count. | ||||||
| private static Duration overwhelmedTimeSinceLastCallback = Duration.ofSeconds(3); | ||||||
| private static double overwhelmedInflightCount = 0.2; | ||||||
| private static double overwhelmedInflightBytes = 0.2; | ||||||
|
|
||||||
| // Time we have spent waiting for a response in the worker. | ||||||
| abstract Duration timeSinceLastCallback(); | ||||||
|
|
||||||
| // Number of in-flight requests bytes in the worker. | ||||||
| abstract long inFlightRequestsBytes(); | ||||||
|
|
||||||
|
|
@@ -1917,12 +1934,14 @@ public abstract static class Load { | |||||
| abstract long maxInflightCount(); | ||||||
|
|
||||||
| static Load create( | ||||||
| Duration timeSinceLastCallback, | ||||||
| long inFlightRequestsBytes, | ||||||
| long inFlightRequestsCount, | ||||||
| long destinationCount, | ||||||
| long maxInflightBytes, | ||||||
| long maxInflightCount) { | ||||||
| return new AutoValue_ConnectionWorker_Load( | ||||||
| timeSinceLastCallback, | ||||||
| inFlightRequestsBytes, | ||||||
| inFlightRequestsCount, | ||||||
| destinationCount, | ||||||
|
|
@@ -1934,20 +1953,29 @@ boolean isOverwhelmed() { | |||||
| // Consider only in flight bytes and count for now, as by experiment those two are the most | ||||||
| // efficient and has great simplity. | ||||||
| return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount() | ||||||
| || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes(); | ||||||
| || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes() | ||||||
| || timeSinceLastCallback().compareTo(overwhelmedTimeSinceLastCallback) > 0; | ||||||
| } | ||||||
|
|
||||||
| // Compares two different load. First compare in flight request bytes split by size 1024 bucket. | ||||||
| // Compares two different load. First compare the timeSinceLastCallback bucketed into 1 second | ||||||
| // intervals. | ||||||
| // Then compare in flight request bytes split by size 1024 bucket. | ||||||
| // Then compare the inflight requests count. | ||||||
| // Then compare destination count of the two connections. | ||||||
| public static final Comparator<Load> LOAD_COMPARATOR = | ||||||
| Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) | ||||||
| Comparator.comparing((Load key) -> (int) (key.timeSinceLastCallback().toMillis() / 1000)) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can simplify
Suggested change
|
||||||
| .thenComparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) | ||||||
| .thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100)) | ||||||
| .thenComparing(Load::destinationCount); | ||||||
|
|
||||||
| // Compares two different load without bucket, used in smaller scale unit testing. | ||||||
| // First compare the timeSinceLastCallback. | ||||||
| // Then compare in flight request bytes. | ||||||
| // Then compare the inflight requests count. | ||||||
| // Then compare destination count of the two connections. | ||||||
| public static final Comparator<Load> TEST_LOAD_COMPARATOR = | ||||||
| Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes()) | ||||||
| Comparator.comparing(Load::timeSinceLastCallback) | ||||||
| .thenComparing((Load key) -> (int) key.inFlightRequestsBytes()) | ||||||
| .thenComparing((Load key) -> (int) key.inFlightRequestsCount()) | ||||||
| .thenComparing(Load::destinationCount); | ||||||
|
|
||||||
|
|
@@ -1960,6 +1988,11 @@ public static void setOverwhelmedBytesThreshold(double newThreshold) { | |||||
| public static void setOverwhelmedCountsThreshold(double newThreshold) { | ||||||
| overwhelmedInflightCount = newThreshold; | ||||||
| } | ||||||
|
|
||||||
| @VisibleForTesting | ||||||
| public static void setOverwhelmedTimeSinceLastCallbackThreshold(Duration newThreshold) { | ||||||
| overwhelmedTimeSinceLastCallback = newThreshold; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @VisibleForTesting | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -94,6 +94,9 @@ void setUp() throws Exception { | |||||||||||||||||||||||||||||||||||||||||
| testBigQueryWrite = new FakeBigQueryWrite(); | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionWorker.setMaxInflightQueueWaitTime(300000); | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10)); | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.2); | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.2); | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3)); | ||||||||||||||||||||||||||||||||||||||||||
| serviceHelper = | ||||||||||||||||||||||||||||||||||||||||||
| new MockServiceHelper( | ||||||||||||||||||||||||||||||||||||||||||
| UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite)); | ||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -865,29 +868,116 @@ void testLoadCompare_compareLoad() { | |||||||||||||||||||||||||||||||||||||||||
| // In flight bytes bucket is split as per 1024 requests per bucket. | ||||||||||||||||||||||||||||||||||||||||||
| // When in flight bytes is in lower bucket, even destination count is higher and request count | ||||||||||||||||||||||||||||||||||||||||||
| // is higher, the load is still smaller. | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 1000, 2000, 100, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 2000, 1000, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // In flight bytes in the same bucke of request bytes will compare request count. | ||||||||||||||||||||||||||||||||||||||||||
| Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load3 = ConnectionWorker.Load.create(Duration.ZERO, 1, 300, 10, 0, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load4 = ConnectionWorker.Load.create(Duration.ZERO, 10, 1, 10, 0, 10); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // In flight request and bytes in the same bucket will compare the destination count. | ||||||||||||||||||||||||||||||||||||||||||
| Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load5 = ConnectionWorker.Load.create(Duration.ZERO, 200, 1, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load6 = ConnectionWorker.Load.create(Duration.ZERO, 100, 10, 10, 1000, 10); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue(); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // timeSinceLastCallback has the highest priority. | ||||||||||||||||||||||||||||||||||||||||||
| // load7 has higher timeSinceLastCallback (2s -> bucket 2) but lower other parameters. | ||||||||||||||||||||||||||||||||||||||||||
| // load8 has lower timeSinceLastCallback (0s -> bucket 0) but higher other parameters. | ||||||||||||||||||||||||||||||||||||||||||
| Load load7 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 10, 10); | ||||||||||||||||||||||||||||||||||||||||||
| Load load8 = ConnectionWorker.Load.create(Duration.ZERO, 10000, 10000, 100, 10, 10); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load7, load8)).isGreaterThan(0); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||||||||||||||||
| void testLoadIsOverWhelmed() { | ||||||||||||||||||||||||||||||||||||||||||
| // Only in flight request is considered in current overwhelmed calculation. | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100); | ||||||||||||||||||||||||||||||||||||||||||
| // In-flight requests, bytes, and timeSinceLastCallback are considered in overwhelmed | ||||||||||||||||||||||||||||||||||||||||||
| // calculation. | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Overwhelmed by request count | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 60, 10, 100, 90, 100); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(load1.isOverwhelmed()).isTrue(); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(load2.isOverwhelmed()).isFalse(); | ||||||||||||||||||||||||||||||||||||||||||
| // Not overwhelmed | ||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100); | ||||||||||||||||||||||||||||||||||||||||||
| assertFalse(load2.isOverwhelmed()); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Under threshold (3s) for timeSinceLastCallback | ||||||||||||||||||||||||||||||||||||||||||
| Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100); | ||||||||||||||||||||||||||||||||||||||||||
| assertFalse(load3.isOverwhelmed()); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Over threshold (3s) for timeSinceLastCallback | ||||||||||||||||||||||||||||||||||||||||||
| Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100); | ||||||||||||||||||||||||||||||||||||||||||
| assertTrue(load4.isOverwhelmed()); | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+903
to
+912
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with the rest of the test suite (which uses Truth's
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||||||||||||||||
| void testGetLoad_timeSinceLastCallback() throws Exception { | ||||||||||||||||||||||||||||||||||||||||||
| ProtoSchema schema1 = createProtoSchema("foo"); | ||||||||||||||||||||||||||||||||||||||||||
| StreamWriter sw1 = | ||||||||||||||||||||||||||||||||||||||||||
| StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); | ||||||||||||||||||||||||||||||||||||||||||
| try (ConnectionWorker connectionWorker = | ||||||||||||||||||||||||||||||||||||||||||
| new ConnectionWorker( | ||||||||||||||||||||||||||||||||||||||||||
| TEST_STREAM_1, | ||||||||||||||||||||||||||||||||||||||||||
| null, | ||||||||||||||||||||||||||||||||||||||||||
| createProtoSchema("foo"), | ||||||||||||||||||||||||||||||||||||||||||
| 10, | ||||||||||||||||||||||||||||||||||||||||||
| 100000, | ||||||||||||||||||||||||||||||||||||||||||
| Duration.ofSeconds(100), | ||||||||||||||||||||||||||||||||||||||||||
| FlowController.LimitExceededBehavior.Block, | ||||||||||||||||||||||||||||||||||||||||||
| TEST_TRACE_ID, | ||||||||||||||||||||||||||||||||||||||||||
| null, | ||||||||||||||||||||||||||||||||||||||||||
| client.getSettings(), | ||||||||||||||||||||||||||||||||||||||||||
| retrySettings, | ||||||||||||||||||||||||||||||||||||||||||
| /* enableRequestProfiler= */ false, | ||||||||||||||||||||||||||||||||||||||||||
| /* enableOpenTelemetry= */ false, | ||||||||||||||||||||||||||||||||||||||||||
| /*isMultiplexing*/ false)) { | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Initially empty, should be zero. | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(connectionWorker.getLoad().timeSinceLastCallback()).isEqualTo(Duration.ZERO); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Keep response in flight | ||||||||||||||||||||||||||||||||||||||||||
| testBigQueryWrite.setResponseSleep(java.time.Duration.ofSeconds(5)); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Send a message | ||||||||||||||||||||||||||||||||||||||||||
| ApiFuture<AppendRowsResponse> future = | ||||||||||||||||||||||||||||||||||||||||||
| sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"hello"}), 0); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // Wait a bit to ensure it is sent and in flight queue | ||||||||||||||||||||||||||||||||||||||||||
| Thread.sleep(500); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Load load = connectionWorker.getLoad(); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(load.timeSinceLastCallback()).isGreaterThan(Duration.ZERO); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(load.timeSinceLastCallback()) | ||||||||||||||||||||||||||||||||||||||||||
| .isLessThan(Duration.ofSeconds(2)); // Should be around 500ms | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||||||||||||||||
| void testLoadCompare_timeSinceLastCallback() { | ||||||||||||||||||||||||||||||||||||||||||
| // Same bytes, same count, same destination, different timeSinceLastCallback | ||||||||||||||||||||||||||||||||||||||||||
| // Bucketed by 1 second (1000ms). | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // 100ms and 200ms are in the same bucket (0). | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(Duration.ofMillis(100), 0, 0, 0, 0, 0); | ||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(Duration.ofMillis(200), 0, 0, 0, 0, 0); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isEqualTo(0); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| // 100ms and 1200ms are in different buckets (0 vs 1). | ||||||||||||||||||||||||||||||||||||||||||
| Load load3 = ConnectionWorker.Load.create(Duration.ofMillis(1200), 0, 0, 0, 0, 0); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load1, load3)).isLessThan(0); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.LOAD_COMPARATOR.compare(load3, load1)).isGreaterThan(0); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||||||||||||||||
| void testTestLoadCompare_timeSinceLastCallback() { | ||||||||||||||||||||||||||||||||||||||||||
| // TEST_LOAD_COMPARATOR compares timeSinceLastCallback unbucketed. | ||||||||||||||||||||||||||||||||||||||||||
| // 1s and 2s should be different. | ||||||||||||||||||||||||||||||||||||||||||
| Load load1 = ConnectionWorker.Load.create(Duration.ofSeconds(1), 0, 0, 0, 0, 0); | ||||||||||||||||||||||||||||||||||||||||||
| Load load2 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 0, 0); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.TEST_LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); | ||||||||||||||||||||||||||||||||||||||||||
| assertThat(Load.TEST_LOAD_COMPARATOR.compare(load2, load1)).isGreaterThan(0); | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you refactor inflightBytes / inflightRequests / head.requestSendTimeStamp into a a atomic member variaable and read that out instead of grabing the lock? (we don't need to examine the queue to get head.requestSendTimeStamp, we can just update a member variable everytime a request is added)
I think the original code also has bug around inflightBytes, that should be volatile or atomic otherwise it can lead to undefined behavior if read and write happen at the same time