-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpg_refclone.py
More file actions
executable file
·234 lines (195 loc) · 9.52 KB
/
pg_refclone.py
File metadata and controls
executable file
·234 lines (195 loc) · 9.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/env python3
import argparse
import logging
import os
import random
import string
import sys
import shutil
import subprocess
from pathlib import Path
def generate_slot_name():
return f"pg_refclone_{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
import psycopg
from psycopg import connect
from psycopg.rows import namedtuple_row
# to ensure that we are actually connecting to the right database,
# get socket dir and port from the postmaster.pid file in original_dir
def conn_str_from_postmaster_pid(data_dir):
postmaster_pid = data_dir / "postmaster.pid"
if not postmaster_pid.exists():
raise ValueError(f"No postmaster.pid found in {data_dir}")
lines = postmaster_pid.read_text().strip().splitlines()
if len(lines) < 5:
raise ValueError(f"Invalid postmaster.pid in {data_dir}")
port = lines[3].strip()
socket_dir = lines[4].strip()
return f"host={socket_dir} port={port} dbname=postgres"
def get_pg_bin_dir(data_dir):
postmaster_opts = data_dir / "postmaster.opts"
if not postmaster_opts.exists():
raise ValueError(f"No postmaster.opts found in {data_dir}")
lines = postmaster_opts.read_text().strip().splitlines()
if not lines:
raise ValueError(f"Invalid postmaster.opts in {data_dir}")
postgres_path = lines[0].split()[0]
return str(Path(postgres_path).parent)
def get_pg_receivewal_path(data_dir):
bin_dir = get_pg_bin_dir(data_dir)
return str(Path(bin_dir) / "pg_receivewal")
INCOMPLETE_BACKUP_LABEL = """pg_refclone.py is not finished!
Removing this file and starting PostgreSQL
will implicitly accept data corruption,
as the recovery will not run through
enough WAL to make things consistent.
"""
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description="Clone a PostgreSQL data directory")
parser.add_argument("original_dir", help="Path to the original PostgreSQL data directory")
parser.add_argument("clone_dir", help="Path for the cloned database")
parser.add_argument(
"--start-fast",
action="store_true",
default=False,
help="Force immediate checkpoint when starting backup"
)
args = parser.parse_args()
original_dir = Path(args.original_dir)
clone_dir = Path(args.clone_dir)
pg_receivewal_path = get_pg_receivewal_path(original_dir)
if clone_dir.exists() and any(clone_dir.iterdir()):
logger.error(f"Error: clone directory {clone_dir} already exists and is not empty")
sys.exit(1)
# test if the original_dir and clone_dir are on the same device,
# otherwise reflink cloning will definitely not work.
# we can check clone_dir's parent (as clone_dir does not exist yet).
if os.stat(original_dir).st_dev != os.stat(clone_dir.parent).st_dev:
logger.error(f"Error: {original_dir} and {clone_dir} must be on the same filesystem")
sys.exit(1)
# check for tablespaces (symlinks in pg_tblspc/ subdirectory)
pg_tblspc_dir = original_dir / "pg_tblspc"
if pg_tblspc_dir.exists():
for entry in pg_tblspc_dir.iterdir():
if entry.is_symlink():
logger.error(f"Error: symlink found in {pg_tblspc_dir}, indicating tablespace usage. Tablespaces are not supported.")
sys.exit(1)
# create directory for clone
logger.info(f"Creating clone directory: {clone_dir}")
os.makedirs(clone_dir, exist_ok=True)
# make sure data directory cannot be started before we finish properly
logger.info(f"FILE: write {(clone_dir / 'backup_label')}")
(clone_dir / "backup_label").write_text(INCOMPLETE_BACKUP_LABEL)
# open a connection to the original database, we'll use this to create a
# replication slot, and start and stop the backup, and drop the slot later.
conn_str = conn_str_from_postmaster_pid(original_dir)
try:
conn = connect(conn_str)
except psycopg.OperationalError as e:
logger.error(f"Error: could not connect to database: {e}")
sys.exit(1)
conn.autocommit = True
slot_name = generate_slot_name()
try:
# create replication slot so that we can later pull all the required WAL
# (will be cleaned up on error or clean exit by the finally statement below)
with conn.cursor() as cur:
logger.info(f"DB: SELECT pg_create_physical_replication_slot('{slot_name}', TRUE)")
cur.execute(f"SELECT pg_create_physical_replication_slot('{slot_name}', TRUE)")
cur.fetchone()
# start backup
logger.info(f"DB: SELECT pg_backup_start('pg_refclone', {args.start_fast})")
cur.execute("SELECT pg_backup_start('pg_refclone', %s)", (args.start_fast,))
start_lsn = cur.fetchone()[0]
logger.info(f"BACKUP START LSN: {start_lsn}")
# do the reflink copy
# NOTE: shutil.copytree in Python 3.14+ will use optimizations like copy_file_range(2) for
# local file copies when copy_function=shutil.copy2 is used.
# See https://docs.python.org/3/library/shutil.html#platform-dependent-efficient-copy-operations
# We will not recreate symlinks, but instead copy the symlinked files directly.
# This will most likely not be blazing fast, e.g. if you have whole tablespaces symlinked,
# but it will be safe that way. Recreating the symlinks would result in two database directories
# linking to the same exact files on disk, leading to corruption.
logger.info(f"FILE: Copying {original_dir} to {clone_dir}")
shutil.copytree(
original_dir, clone_dir,
symlinks=False,
copy_function=shutil.copy2,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns("pg_wal", "postmaster.pid", "postmaster.opts", "pg_control")
)
# make sure the clone is started in standby mode
standby_signal = clone_dir / "standby.signal"
logger.info(f"FILE: touch {standby_signal}")
standby_signal.touch()
# stop backup, saving return values for later use
# the return values are lsn, labelfile, spcmapfile
with conn.cursor(row_factory=namedtuple_row) as cur:
logger.info("DB: SELECT * FROM pg_backup_stop(FALSE)")
cur.execute("SELECT * FROM pg_backup_stop(FALSE)")
result = cur.fetchone()
end_lsn = result.lsn
labelfile = result.labelfile
spcmapfile = result.spcmapfile
logger.info(f"BACKUP STOP LSN: {end_lsn}")
# create backup_label file
logger.info(f"FILE: write {(clone_dir / 'backup_label')}")
(clone_dir / "backup_label").write_text(labelfile)
# copy pg_control (was excluded from copytree)
shutil.copy2(original_dir / "global" / "pg_control", clone_dir / "global" / "pg_control")
# create tablespace_map file
tablespace_map_path = clone_dir / "tablespace_map"
if spcmapfile is not None:
logger.info(f"FILE: write {tablespace_map_path}")
tablespace_map_path.write_bytes(spcmapfile.encode() if isinstance(spcmapfile, str) else spcmapfile)
# advance slot to start of backup (otherwise we get all the WAL produced before pg_backup_start was called)
with conn.cursor() as cur:
logger.info(f"DB: SELECT pg_replication_slot_advance('{slot_name}', {start_lsn!r})")
cur.execute(f"SELECT pg_replication_slot_advance('{slot_name}', %s)", (start_lsn,))
cur.fetchone()
# create pg_wal directory (not copied via ignore_patterns)
pg_wal_path = clone_dir / "pg_wal"
logger.info(f"FILE: mkdir {pg_wal_path}")
pg_wal_path.mkdir()
# pull all the WAL that we need, should end automatically when we reach backup end LSN
logger.info(f"PG_RECEIVEWAL: launch pg_receivewal to stream WAL to {pg_wal_path}")
result = subprocess.run(
[
pg_receivewal_path,
"-v",
"-d", conn_str,
"-D", str(pg_wal_path) + "/",
"-S", slot_name,
"-E", str(end_lsn),
"-s", "1"
],
capture_output=True,
text=True
)
logger.info(f"PG_RECEIVEWAL output begin:\n{result.stdout}\nPG_RECEIVEWAL output end:")
if result.returncode != 0:
logger.error(f"PG_RECEIVEWAL error begin:\n: {result.stderr}\nPG_RECEIVEWAL error end:")
sys.exit(1)
# get the WAL file name matching the backup end LSN, ensuring the latter is properly quoted
with conn.cursor() as cur:
logger.info(f"DB: SELECT pg_walfile_name({end_lsn!r})")
cur.execute("SELECT pg_walfile_name(%s)", (end_lsn,))
stop_walfile = cur.fetchone()[0]
logger.info(f"BACKUP STOP WAL FILE: {stop_walfile}")
# rename the WAL file containing the backup stop LSN,
# as pg_receivewal keeps it as a ".partial" file,
# even though pg_backup_stop() switches to a new file.
partial_wal = pg_wal_path / f"{stop_walfile}.partial"
if partial_wal.exists():
final_wal = pg_wal_path / stop_walfile
logger.info(f"FILE: rename {partial_wal} -> {final_wal}")
partial_wal.rename(final_wal)
finally:
# drop slot later
with conn.cursor() as cur:
logger.info(f"DB: SELECT pg_drop_replication_slot('{slot_name}')")
cur.execute(f"SELECT pg_drop_replication_slot('{slot_name}')")
conn.close()
if __name__ == "__main__":
main()