Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
81f86ae
Update for 2.2.0
Nov 10, 2025
b408850
[FLINK-38640][table-planner] Fix NPE in DeltaJoinUtil#isFilterOnOneSe…
xuyangzhong Nov 10, 2025
6e89776
[hotfix] [docs] Ververica URL and Product Name has been updated
nacisimsek Nov 12, 2025
f9ad1b1
[hotfix][examples] Add Python examples on how to read binary data fro…
dianfu Nov 12, 2025
1bca9b9
[FLINK-38436][doc] Add vector search doc (#27216)
fsk119 Nov 11, 2025
d421622
[FLINK-38436][doc] Add Chinese version of vector search doc (#27232)
lihaosky Nov 13, 2025
7e2e854
[FLINK-38622][runtime] Enhance the requests and slots balanced alloca…
RocMarshal Nov 13, 2025
71fb419
[FLINK-38611][doc] Add doc for delta join (#27225)
xuyangzhong Nov 14, 2025
30b5b97
[FLINK-33392][docs] Add the documentation pages for balanced tasks sc…
RocMarshal Nov 14, 2025
ca9f7b7
[hotfix][docs] Add `SHOW MATERIALIZED TABLES`, `SHOW CREATE MATERIALI…
snuyanzin Nov 14, 2025
f621ab6
[FLINK-38682][table-planner] Support unknown -> RAW cast during type …
ferenc-csaky Nov 17, 2025
434c800
[FLINK-38686][doc] Add model table api documentation (#27243) (#27246)
lihaosky Nov 18, 2025
ef31ad4
[FLINK-38695][table-planner] Fix wrong metric about left cache reques…
xuyangzhong Nov 20, 2025
4fd5bea
[FLINK-36746][core] Fix the deadlock bug in SerializedThrowable
RocMarshal Nov 3, 2025
e666847
[FLINK-38709][table][python] Fix ScalarFunctionSplitter to allow Pyth…
dianfu Nov 20, 2025
bf9cc5d
[FLINK-38700][python] Fix kubernetes pyflink application test
snuyanzin Nov 25, 2025
2910d72
[BP-2.2][FLINK-38711][build] Set maven-shade-plugin logs to DEBUG
mateczagany Nov 25, 2025
fefb741
[hotfix] Fix flink-model-openai NOTICE for jtokkit
mateczagany Nov 26, 2025
e4c197d
[FLINK-38576][table] Align commonJoinKey in MultiJoin for logical and…
gustavodemorais Nov 26, 2025
ad47717
[FLINK-38750][table] Validation of queries with functions erroneously…
snuyanzin Dec 2, 2025
3f79fac
[FLINK-38084][doc] Add download doc for model providers (#27296)
lihaosky Dec 3, 2025
3546641
[FLINK-38767][table] Fix vector search execnode transform name (#27304)
lihaosky Dec 4, 2025
8de272c
[FLINK-38773][table] Fix batch vector search excnode context (#27311)
lihaosky Dec 4, 2025
eb65369
[FLINK-38797][python] Fixed CsvSchemaBuilder.set_null_value to return…
wchan87 Dec 10, 2025
932aa43
[FLINK-38824][table] Fix incorrect default values for primitive types
dylanhz Dec 25, 2025
0f27e47
[FLINK-38703][runtime] Update slot manager metrics in thread-safety m…
ztison Nov 20, 2025
8647457
[FLINK-38914][docs] Preserve page path when linking to stable version
MartijnVisser Dec 29, 2025
28bda0b
[FLINK-38914][docs] Add canonical tags pointing to stable docs
MartijnVisser Jan 14, 2026
0a83669
[FLINK-38925][docs] Update Matomo URL to the right domain
MartijnVisser Jan 15, 2026
00c6e94
[FLINK-38924][docs] Redirect users to documentation home page when en…
MartijnVisser Jan 15, 2026
17e038d
[FLINK-38951][python] Bump pemja to 0.5.6
Sxnan Jan 21, 2026
2e1f8ad
[FLINK-38924][docs] Actually redirect users to 404 page when trying t…
MartijnVisser Jan 21, 2026
40b8fea
[FLINK-38955][docs] Add canonical tag to generated Javadoc
MartijnVisser Jan 21, 2026
3d9388a
[FLINK-38955][docs] Configure canonical tag for PyDocs
MartijnVisser Jan 21, 2026
1069da9
[FLINK-38925][docs] Add Matomo config to PyDocs
MartijnVisser Jan 21, 2026
81b1469
[FLINK-38950][table] `SqlNodeConvertUtils` should use validated query…
seb-pereira Jan 21, 2026
05ee000
[FLINK-38957][table-planner] Support `ProcessTableFunction` registrat…
ferenc-csaky Feb 4, 2026
28bfdf6
[hotfix][docs] Update AWS connector version to 6.0
ferenc-csaky Feb 5, 2026
11bdba1
[FLINK-38913][table] `ArrayIndexOutOfBoundsException` while unparsing…
davidradl Feb 5, 2026
8d8b7cd
[FLINK-21672][test] Amend unit test so it works with IBM java (#27515…
davidradl Feb 7, 2026
4ea4611
[FLINK-39052][python] Remove usage of pkg_resources (#27556)
dianfu Feb 9, 2026
a95e225
[FLINK-39052][python] Bump version auditwheel to 6.6.0 (#27563)
dianfu Feb 10, 2026
ea843de
[FLINK-38945][docs] Bump Hugo to v0.124.1 to fix ref resolution for c…
MartijnVisser Feb 10, 2026
1e868c8
[FLINK-39017][web] Fix task click handler in job graph for Chrome 144+
MartijnVisser Feb 10, 2026
23dfc74
[hotfix][infra] Add .metals and .bloop into .gitignore (#27480)
davidradl Jan 28, 2026
526aaeb
[FLINK-39099] Update `testcontainers` to fix GitHub runner Docker com…
ferenc-csaky Feb 17, 2026
6adcc28
[FLINK-39102][e2e] Fix Kubernetes E2E tests broken by Docker 29 runne…
MartijnVisser Feb 17, 2026
b1508fe
[FLINK-36059][tests] Bump cp-kafka and cp-schema-registry from 7.2.2 …
MartijnVisser Feb 18, 2026
88a10d9
[Hotfix] Update lz4-java to 1.10.3
eschcam Feb 23, 2026
a68f4d4
[FLINK-39130][metrics] Allow native types in MetricConfig
Izeren Feb 20, 2026
4a1bb52
[FLINK-38585][python] Fix python env set in Pyflink's thread mode whe…
Feb 26, 2026
d0b188a
[FLINK-39022][security] Set security.ssl.algorithms default value to …
balassai Mar 2, 2026
7b9a4fe
[FLINK-38624][table] Type Mismatch Exception in StreamPhysicalOverAgg…
snuyanzin Mar 2, 2026
d1da51a
[FLINK-39150][runtime] Fix join operator crashes jobs when using cust…
noorall Feb 27, 2026
b5bdb23
[FLINK-39162][checkpoints] Disable UC for CustomPartitioner
rkhachatryan Mar 3, 2026
41455ec
[FLINK-38720][table] Nested nullability might lead to class cast `Rex…
snuyanzin Nov 25, 2025
6290979
[FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN` with `NOT NUL…
snuyanzin Nov 10, 2025
f0f84c1
[FLINK-39242][python] Fix error when getting columns from ResolvedSchema
autophagy Mar 16, 2026
26ab887
[FLINK-38704][metrics] Fix MetricConfig.getString() to handle numeric…
mukul-8 Mar 20, 2026
614c549
[FLINK-38916][table-planner] MultiJoin produces incorrect results for…
gustavodemorais Mar 24, 2026
c361ea6
[FLINK-39360][table] `LIKE` clause doesn't support some patterns
snuyanzin Apr 1, 2026
0729118
[FLINK-39371][table] `CurrentDatabase` fails for some patterns
snuyanzin Apr 1, 2026
53addd6
[FLINK-38815] Mask sensitive values in Pekko debug configuration logs
ferenc-csaky Apr 1, 2026
4c9fad6
[FLINK-39355][table-planner] Fix field names in code generation for J…
snuyanzin Apr 1, 2026
43d524d
[FLINK-39073][runtime] Defer alignment check for idle splits
Efrat19 Feb 19, 2026
93558de
[FLINK-39073][runtime] Improve logging of invalid split transitions
Efrat19 Feb 19, 2026
bf113c6
[FLINK-39073][runtime] Test split state timers during deferred alignm…
Efrat19 Feb 22, 2026
ef7e2e3
[FLINK-39015][table] Fix key extractor for multi join by changing Gen…
gustavodemorais Apr 14, 2026
ecee52b
[FLINK-39394][web] Fix job overview metrics broken when a vertex is f…
Izeren Apr 2, 2026
bc0a0ea
[FLINK-39395][build] Add spotless `upToDateChecking`
snuyanzin Apr 16, 2026
176dee1
[FLINK-39424][table] Setting LIKE does not support default escape cha…
snuyanzin Apr 17, 2026
61b85e2
[FLINK-39480][ci] Python wheel on MacOS fails for all 2.x branches
snuyanzin Apr 17, 2026
66689e4
[FLINK-39534][python] Bump pemja to 0.5.7 (#28016)
wenjin272 Apr 24, 2026
b976e79
[FLINK-39293][table] `MATCH_RECOGNIZE` fails with `SqlParserException…
snuyanzin Apr 24, 2026
e0b566f
[FLINK-33903][ci] Reenable tests in GHA
snuyanzin Apr 27, 2026
89979da
[hotfix][ci] Set correct maven repo folder in flink-ci (#28049)
snuyanzin Apr 27, 2026
ce68d4a
[FLINK-39547][table] Fix table-planner class loading order
piotrp Apr 29, 2026
f9d42d6
Fix typo
OlivierHermans Apr 29, 2026
5efd9b3
Another typo spotted
OlivierHermans Apr 29, 2026
0d70b3e
Missing space
OlivierHermans Apr 29, 2026
9ed91b2
Implement change of PR 28064
OlivierHermans Apr 29, 2026
de70c41
Update to the zh content
OlivierHermans Apr 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion .github/workflows/docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ if [ $? -ne 0 ]; then
exit 1
fi

# Generate .htaccess with dynamic 404 path based on branch
BRANCH=$(git branch --show-current)
cat > docs/target/.htaccess << EOF
# Ensure index.html is served for directory requests
DirectoryIndex index.html

# Custom 404 error page
ErrorDocument 404 /flink/flink-docs-${BRANCH}/404.html
EOF

# build Flink; required for Javadoc step
mvn clean install -B -DskipTests -Dfast -Dskip.npm -Pskip-webui-build

Expand All @@ -62,7 +72,16 @@ mvn javadoc:aggregate -B \
-Dcheckstyle.skip=true \
-Dspotless.check.skip=true \
-Denforcer.skip=true \
-Dheader="<a href=\"http://flink.apache.org/\" target=\"_top\"><h1>Back to Flink Website</h1></a> <script>var _paq=window._paq=window._paq||[];_paq.push([\"disableCookies\"]),_paq.push([\"setDomains\",[\"*.flink.apache.org\",\"*.nightlies.apache.org/flink\"]]),_paq.push([\"trackPageView\"]),_paq.push([\"enableLinkTracking\"]),function(){var u=\"//matomo.privacy.apache.org/\";_paq.push([\"setTrackerUrl\",u+\"matomo.php\"]),_paq.push([\"setSiteId\",\"1\"]);var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s)}();</script>"
-Dheader="<a href=\"http://flink.apache.org/\" target=\"_top\"><h1>Back to Flink Website</h1></a> <script>var _paq=window._paq=window._paq||[];_paq.push([\"disableCookies\"]),_paq.push([\"setDomains\",[\"*.flink.apache.org\",\"*.nightlies.apache.org/flink\"]]),_paq.push([\"trackPageView\"]),_paq.push([\"enableLinkTracking\"]),function(){var u=\"//analytics.apache.org/\";_paq.push([\"setTrackerUrl\",u+\"matomo.php\"]),_paq.push([\"setSiteId\",\"1\"]);var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s)}();</script>"

# Inject canonical tags into Javadoc HTML files to point to stable docs version
CANONICAL_BASE="https://nightlies.apache.org/flink/flink-docs-stable/api/java"
find target/site/apidocs -name "*.html" -type f | while read -r file; do
REL_PATH="${file#target/site/apidocs/}"
CANONICAL_URL="${CANONICAL_BASE}/${REL_PATH}"
sed -i "s|<head>|<head>\n<link rel=\"canonical\" href=\"${CANONICAL_URL}\">|" "$file"
done

mv target/site/apidocs docs/target/api/java

# build python docs
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ jobs:
with:
python-version: '3.x'
- name: "Install cibuildwheel"
run: python -m pip install cibuildwheel==2.16.5
# Need to limit setuptools here in order to apply limitations for transitive dependencies
run: |
python -m pip install cibuildwheel==2.16.5
echo "setuptools<82" > /tmp/build-constraints.txt
- name: "Build python wheels for ${{ matrix.os_name }}"
run: python -m cibuildwheel --output-dir flink-python/dist flink-python
env:
Expand All @@ -116,6 +119,7 @@ jobs:
CIBW_REPAIR_WHEEL_COMMAND_LINUX: "auditwheel repair -w {dest_dir} {wheel}"
# Skip repair on MacOS
CIBW_REPAIR_WHEEL_COMMAND_MACOS: ""
PIP_CONSTRAINT: /tmp/build-constraints.txt
- name: "Upload python wheels"
uses: actions/upload-artifact@v4
with:
Expand Down
23 changes: 21 additions & 2 deletions .github/workflows/template.flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,36 @@ jobs:
working-directory: ${{ env.CONTAINER_LOCAL_WORKING_DIR }}
run: ./tools/azure-pipelines/cache_docker_images.sh load

- name: "Create non-root user for test execution"
run: |
useradd -m -u 1001 flink
echo "flink ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
for sock in /var/run/docker.sock /run/docker.sock; do
if [ -e "$sock" ]; then
chmod 666 "$sock"
echo "Docker socket found at $sock"
fi
done
mkdir -p \
${{ env.CONTAINER_LOCAL_WORKING_DIR }} \
${{ env.MAVEN_REPO_FOLDER }} \
${{ env.DOCKER_IMAGES_CACHE_FOLDER }}
chown -R flink:flink ${{ env.CONTAINER_LOCAL_WORKING_DIR }}
chown -R flink:flink ${{ env.MAVEN_REPO_FOLDER }}
chown -R flink:flink ${{ env.DOCKER_IMAGES_CACHE_FOLDER }}

- name: "Test - ${{ matrix.module }}"
id: test-run
working-directory: ${{ env.CONTAINER_LOCAL_WORKING_DIR }}
env:
IT_CASE_S3_BUCKET: ${{ secrets.s3_bucket }}
IT_CASE_S3_ACCESS_KEY: ${{ secrets.s3_access_key }}
IT_CASE_S3_SECRET_KEY: ${{ secrets.s3_secret_key }}
DOCKER_HOST: unix:///var/run/docker.sock
timeout-minutes: ${{ fromJSON(env.GHA_JOB_TIMEOUT) }}
run: |
${{ inputs.environment }} PROFILE="$PROFILE -Pgithub-actions" ./tools/azure-pipelines/uploading_watchdog.sh \
./tools/ci/test_controller.sh ${{ matrix.module }}
runuser -u flink -- bash -c '${{ inputs.environment }} PROFILE="$PROFILE -Pgithub-actions" ./tools/azure-pipelines/uploading_watchdog.sh \
./tools/ci/test_controller.sh ${{ matrix.module }}'

- name: "Post-build Disk Info"
if: ${{ always() }}
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ scalastyle-output.xml
!.idea/vcs.xml
!.idea/icon.png
.vscode
.metals
.bloop
.cursor
.metadata
.settings
.project
.version.properties
.spotless-index-file
filter.properties
logs.zip
.mvn/wrapper/*.jar
Expand Down
19 changes: 10 additions & 9 deletions docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

baseURL = '//nightlies.apache.org/flink/flink-docs-master'
baseURL = '//nightlies.apache.org/flink/flink-docs-release-2.2'
languageCode = "en-us"
title = "Apache Flink"
enableGitInfo = false
Expand All @@ -24,7 +24,7 @@ pygmentsUseClasses = true
[params]
# Flag whether this is a stable version or not.
# Used for the quickstart page.
IsStable = false
IsStable = true

# Flag to indicate whether an outdated warning should be shown.
ShowOutDatedWarning = false
Expand All @@ -34,14 +34,14 @@ pygmentsUseClasses = true
# we change the version for the complete docs when forking of a release branch
# etc.
# The full version string as referenced in Maven (e.g. 1.2.1)
Version = "2.2-SNAPSHOT"
Version = "2.2.0"

# For stable releases, leave the bugfix version out (e.g. 1.2). For snapshot
# release this should be the same as the regular version
VersionTitle = "2.2-SNAPSHOT"
VersionTitle = "2.2"

# The branch for this version of Apache Flink
Branch = "master"
Branch = "release-2.2"

# The github repository for Apache Flink
Repo = "//github.com/apache/flink"
Expand All @@ -60,19 +60,20 @@ pygmentsUseClasses = true

ZhDownloadPage = "//flink.apache.org/zh/downloads.html"

JavaDocs = "//nightlies.apache.org/flink/flink-docs-master/api/java/"
JavaDocs = "//nightlies.apache.org/flink/flink-docs-release-2.2/api/java/"

PyDocs = "//nightlies.apache.org/flink/flink-docs-master/api/python/"
PyDocs = "//nightlies.apache.org/flink/flink-docs-release-2.2/api/python/"

# External links at the bottom
# of the menu
MenuLinks = [
["Project Homepage", "//flink.apache.org"],
["JavaDocs", "//nightlies.apache.org/flink/flink-docs-master/api/java/"],
["PyDocs", "//nightlies.apache.org/flink/flink-docs-master/api/python/"]
["JavaDocs", "//nightlies.apache.org/flink/flink-docs-release-2.2/api/java/"],
["PyDocs", "//nightlies.apache.org/flink/flink-docs-release-2.2/api/python/"]
]

PreviousDocs = [
["2.2", "http://nightlies.apache.org/flink/flink-docs-release-2.2"],
["2.1", "http://nightlies.apache.org/flink/flink-docs-release-2.1"],
["2.0", "http://nightlies.apache.org/flink/flink-docs-release-2.0"],
["1.20", "http://nightlies.apache.org/flink/flink-docs-release-1.20"],
Expand Down
41 changes: 41 additions & 0 deletions docs/content.zh/docs/connectors/models/downloads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
title: 下载页面
weight: 100
type: docs
bookToc: false
aliases:
- /zh/dev/table/connectors/downloads.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# SQL Models 下载页面

{{< unstable >}}
{{< hint info >}}
Download links are available only for stable releases.
{{< /hint >}}
{{< /unstable >}}

The page contains links to optional sql-client models that are not part of the binary distribution.

# 可选的 SQL Models
-------------------

{{< sql_optional_models >}}
4 changes: 2 additions & 2 deletions docs/content.zh/docs/deployment/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ Supported Environments:
Supported Environment:
{{< label Huawei Cloud >}}

#### Ververica Platform
#### Ververica's Unified Streaming Data Platform (Managed Service / BYOC / Self-Managed)

[Website](https://www.ververica.com/platform-overview)
[Website](https://www.ververica.com/product)

Supported Environments:
{{< label AliCloud >}}
Expand Down
16 changes: 6 additions & 10 deletions docs/content.zh/docs/deployment/security/security-ssl.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,16 @@ security.ssl.rest.authentication-enabled: false

### Cipher suites

{{< hint warning >}}
The [IETF RFC 7525](https://tools.ietf.org/html/rfc7525) recommends to use a specific set of cipher suites for strong security.
Because these cipher suites were not available on many setups out of the box, Flink's default value is set to a slightly
weaker but more compatible cipher suite.
We recommend that SSL setups update to the stronger cipher suites, if possible, by adding the below entry to the Flink configuration:
For strong security, it is crucial to use modern and robust cipher suites. [IETF RFC 9325](https://www.rfc-editor.org/info/rfc9325), which supersedes the older RFC 7525, provides current recommendations for the secure use of TLS.

In response to evolving security standards and to ensure compatibility with modern Java versions, Flink has updated its default cipher suites. Recent JDK updates (affecting versions like 11.0.30+, 17.0.18+, etc.) have disabled older `TLS_RSA_*` cipher suites that lack forward secrecy.

```yaml
security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
```
To support these secure-by-default JDK versions and align with best practices, Flink's default value for `security.ssl.algorithms` is now:

If these cipher suites are not supported on your setup, you will see that Flink processes will not be able to connect to each other.
`TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384`

{{< /hint >}}
This default provides strong security and wide compatibility. You can customize the cipher suites using the `security.ssl.algorithms` configuration option if your environment has different requirements.
If these cipher suites are not supported on your setup, you will see that Flink processes will not be able to connect to each other.

### Complete List of SSL Options

Expand Down
23 changes: 23 additions & 0 deletions docs/content.zh/docs/deployment/tasks-scheduling/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
title: Tasks Scheduling
bookCollapseSection: true
weight: 9
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
---
title: Balanced Tasks Scheduling
weight: 5
type: docs

---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Balanced Tasks Scheduling

This page describes the background and principle of balanced tasks scheduling,
how to use it when running streaming jobs.

## Background

When the parallelism of all vertices within a Flink streaming job is inconsistent,
the [default strategy]({{< ref "docs/deployment/config" >}}#taskmanager-load-balance-mode)
of Flink to deploy tasks sometimes leads some `TaskManagers` have more tasks while others have fewer tasks,
resulting in excessive resource utilization at some `TaskManagers`
that contain more tasks and becoming a bottleneck for the entire job processing.

{{< img src="/fig/deployments/tasks-scheduling/tasks_scheduling_skew_case.svg" alt="The Skew Case of Tasks Scheduling" class="offset" width="50%" >}}

As shown in figure (a), given a Flink job comprising two vertices, `JobVertex-A (JV-A)` and `JobVertex-B (JV-B)`,
with parallelism degrees of `6` and `3` respectively,
and both vertices sharing the same slot sharing group.
Under the default tasks scheduling strategy, as illustrated in figure (b),
the distribution of tasks across `TaskManagers` may result in significant disparities in task load.
Specifically, the `TaskManager`s with the highest number of tasks may host `4` tasks,
while the one with the lowest load may have only `2` tasks.
Consequently, the `TaskManager`s bearing 4 tasks is prone to become a performance bottleneck for the entire job.

Therefore, Flink provides the task-quantity-based balanced tasks scheduling capability.
Within the job's resource view, it aims to ensure that the number of tasks
scheduled to each `TaskManager` as close as possible to,
thereby improving the resource usage skew among `TaskManagers`.

<span class="label label-info">Note</span> The presence of inconsistent parallelism does not imply that this strategy must be used, as this is not always the case in practice.

## Principle

The task-quantity-based load balancing tasks scheduling strategy completes the assignment of tasks to `TaskManagers` in two phases:
- The tasks-to-slots assignment phase
- The slots-to-TaskManagers assignment phase

This section will use two examples to illustrate the simplified process and principle of
how the task-quantity-based tasks scheduling strategy handles the assignments in these two phases.

### The tasks-to-slots assignment phase

Taking the job shown in figure (c) as an example, it contains five job vertices with parallelism degrees of `1`, `4`, `4`, `2`, and `3`, respectively.
All five job vertices belong to the default slot sharing group.

{{< img src="/fig/deployments/tasks-scheduling/tasks_to_slots_allocation_principle.svg" alt="The Tasks To Slots Allocation Principle Demo" class="offset" width="65%" >}}

During the tasks-to-slots assignment phase, this tasks scheduling strategy:
- First directly assigns the tasks of the vertices with the highest parallelism to the `i-th` slot.

That is, task `JV-Bi` is assigned directly to `sloti`, and task `JV-Ci` is assigned directly to `sloti`.

- Next, for tasks belonging to job vertices with sub-maximal parallelism, they are assigned in a round-robin fashion across the slots within the current
slot sharing group until all tasks are allocated.

As shown in figure (e), under the task-quantity-based assignment strategy, the range (max-min difference) of the number of tasks per slot is `1`,
which is better than the range of `3` under the default strategy shown in figure (d).

Thus, this ensures a more balanced distribution of the number of tasks across slots.

### The slots-to-TaskManagers assignment phase

As shown in figure (f), given a Flink job comprising two vertices, `JV-A` and `JV-B`, with parallelism of `6` and `3` respectively,
and both vertices sharing the same slot sharing group.

{{< img src="/fig/deployments/tasks-scheduling/slots_to_taskmanagers_allocation_principle.svg" alt="The Slots to TaskManagers Allocation Principle Demo" class="offset" width="75%" >}}

The assignment result after the first phase is shown in figure (g),
where `Slot0`, `Slot1`, and `Slot2` each contain `2` tasks, while the remaining slots contain `1` task each.

Subsequently:
- The strategy submits all slot requests and waits until all slot resources required for the current job are ready.

Once the slot resources are ready:
- The strategy then sorts all slot requests in descending order based on the number of tasks contained in each request.
Afterward, it sequentially assigns each slot request to the `TaskManager` with the smallest current tasks loading.
This process continues until all slot requests have been allocated.

The final assignment result is shown in figure (i), where each `TaskManager` ends up with exactly `3` tasks,
resulting in a task count difference of `0` between `TaskManagers`. In contrast, the scheduling result under the default strategy,
shown in figure (h), has a task count difference of `2` between `TaskManagers`.

Therefore, if you are seeing performance bottlenecks of the sort described above,
then using this load balancing tasks scheduling strategy can improve performance.
Be aware that you should not use this strategy, if you are not seeing these bottlenecks,
as you may experience performance degradation.

## Usage

You can enable balanced tasks scheduling through the following configuration item:

- `taskmanager.load-balance.mode`: `tasks`

## More details

See the <a href="https://cwiki.apache.org/confluence/x/U56zDw">FLIP-370</a> for more details.

{{< top >}}
Loading