#!/usr/local/bin/perl eval 'exec /usr/local/bin/perl -S $0 ${1+"$@"}' if 0; # not running under some shell # This is a program to dump sets of MySQL tables in parallel, via mysqldump or # SELECT INTO OUTFILE. # # This program is copyright (c) 2007 Baron Schwartz. Feedback and improvements # are welcome. # # THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED # WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF # MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation, version 2; OR the Perl Artistic License. On UNIX and similar # systems, you can issue `man perlgpl' or `man perlartistic' to read these # licenses. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 59 Temple # Place, Suite 330, Boston, MA 02111-1307 USA. use strict; use warnings FATAL => 'all'; # ########################################################################### # This is a combination of modules and programs in one -- a runnable module. # http://www.perl.com/pub/a/2006/07/13/lightning-articles.html?page=last # Or, look it up in the Camel book on pages 642 and 643 in the 3rd edition. # ########################################################################### # ########################################################################### # OptionParser package # ########################################################################### use strict; use warnings FATAL => 'all'; package OptionParser; use Getopt::Long; use List::Util qw(max); sub new { my ( $class, @opts ) = @_; bless { specs => \@opts }, $class; } sub parse { my ( $self, %defaults ) = @_; my @specs = @{$self->{specs}}; my %opt_seen; foreach my $spec ( @specs ) { my ( $long, $short ) = $spec->{s} =~ m/^([\w-]+)(?:\|([^!+=]*))?/; $spec->{k} = $short || $long; $spec->{l} = $long; $spec->{t} = $short; $spec->{n} = $spec->{s} =~ m/!/; $defaults{$spec->{k}} = undef unless defined $defaults{$spec->{k}}; die "Duplicate option $spec->{k}" if $opt_seen{$spec->{k}}++; } foreach my $key ( keys %defaults ) { die "No such option '$key'\n" unless exists $opt_seen{$key}; } Getopt::Long::Configure('no_ignore_case', 'bundling'); GetOptions( map { $_->{s} => \$defaults{$_->{k}} } @specs ) or $defaults{help} = 1; return %defaults; } sub usage { my ( $self ) = @_; my @specs = @{$self->{specs}}; my $maxw = max(map { length($_->{l}) + ($_->{n} ? 4 : 0)} @specs); my $usage = ''; foreach my $spec ( sort { $a->{l} cmp $b->{l} } @specs ) { my $long = $spec->{n} ? "[no]$spec->{l}" : $spec->{l}; my $short = $spec->{t} ? "-$spec->{t}" : ''; $usage .= sprintf(" --%-${maxw}s %-4s %s\n", $long, $short, $spec->{d}); } return $usage; } 1; # ########################################################################### # End OptionParser package # ########################################################################### # ########################################################################### # TableParser package # ########################################################################### use strict; use warnings FATAL => 'all'; package TableParser; sub new { bless {}, shift; } sub parse { my ( $self, $ddl, $opts ) = @_; if ( $ddl !~ m/CREATE TABLE `/ ) { die "Cannot parse table definition; is ANSI quoting enabled or SQL_QUOTE_SHOW_CREATE disabled?"; } my ( $engine ) = $ddl =~ m/\) (?:ENGINE|TYPE)=(\w+)/; my @defs = $ddl =~ m/^(\s+`.*?),?$/gm; my @cols = map { $_ =~ m/`([^`]+)`/g } @defs; my %def_for; @def_for{@cols} = @defs; my @nums = map { $_ =~ m/`([^`]+)`/g } grep { $_ =~ m/`[^`]+` (?:(?:tiny|big|medium|small)?int|float|double|decimal|year)/ } @defs; my %is_numeric = map { $_ => 1 } @nums; my %type_for; foreach my $col ( @cols ) { my $def = $def_for{$col}; my ( $type ) = $def =~ m/`[^`]+`\s([a-z]+)/; die "Can't determine column type for $def" unless $type; $type_for{$col} = $type; } my @null; foreach my $col ( @cols ) { my $def = $def_for{$col}; next if $def =~ m/NOT NULL/ || $def =~ m/text$/; push @null, $col; } my %is_nullable = map { $_ => 1 } @null; my %keys; foreach my $key ( $ddl =~ m/^ ((?:[A-Z]+ )?KEY .*)$/gm ) { if ( $engine !~ m/MEMORY|HEAP/ ) { $key =~ s/USING HASH/USING BTREE/; } my ( $type, $cols ) = $key =~ m/(?:USING (\w+))? \((.+)\)/; my ( $special ) = $key =~ m/(FULLTEXT|SPATIAL)/; $type = $type || $special || 'BTREE'; if ( $opts->{mysql_version} && $opts->{mysql_version} lt '004001000' && $engine =~ m/HEAP|MEMORY/i ) { $type = 'HASH'; # MySQL pre-4.1 supports only HASH indexes on HEAP } my ($name) = $key =~ m/(PRIMARY|`[^`]*`)/; my $unique = $key =~ m/PRIMARY|UNIQUE/ ? 1 : 0; my @cols = grep { m/[^,]/ } split('`', $cols); $name =~ s/`//g; $keys{$name} = { colnames => $cols, cols => \@cols, unique => $unique, is_col => { map { $_ => 1 } @cols }, is_nullable => scalar(grep { $is_nullable{$_} } @cols), type => $type, }; } return { cols => \@cols, is_col => { map { $_ => 1 } @cols }, null_cols => \@null, is_nullable => \%is_nullable, keys => \%keys, defs => \%def_for, numeric_cols => \@nums, is_numeric => \%is_numeric, engine => $engine, type_for => \%type_for, }; } sub get_ddl { my ( $self, $dbh, $db, $tbl ) = @_; $dbh->do('/*!40101 SET @OLD_SQL_MODE := @@SQL_MODE, ' . '@@SQL_MODE := REPLACE(REPLACE(@@SQL_MODE, "ANSI_QUOTES", ""), ",,", ","), ' . '@OLD_QUOTE := @@SQL_QUOTE_SHOW_CREATE, ' . '@@SQL_QUOTE_SHOW_CREATE := 1 */'); my $href = $dbh->selectrow_hashref("SHOW CREATE TABLE `$db`.`$tbl`"); $dbh->do('/*!40101 SET @@SQL_MODE := @OLD_SQL_MODE, ' . '@@SQL_QUOTE_SHOW_CREATE := @OLD_QUOTE */'); my ($key) = grep { m/create table/i } keys %$href; return $href->{$key}; } 1; # ########################################################################### # End TableParser package # ########################################################################### # ########################################################################### # TableChunker package # ########################################################################### use strict; use warnings FATAL => 'all'; package TableChunker; use POSIX qw(ceil); use List::Util qw(min max); sub new { bless {}, shift; } my $EPOCH = '1970-01-01'; my %int_types = map { $_ => 1 } qw( bigint date datetime int mediumint smallint time timestamp tinyint year ); my %real_types = map { $_ => 1 } qw( decimal double float ); sub find_chunk_columns { my ( $self, $table, $opts ) = @_; $opts ||= {}; my @candidate_cols; my @possible_keys = grep { $_->{type} eq 'BTREE' } values %{$table->{keys}}; my $can_chunk_exact = 0; if ($opts->{exact}) { @candidate_cols = grep { $int_types{$table->{type_for}->{$_}} || $real_types{$table->{type_for}->{$_}} } map { $_->{cols}->[0] } grep { $_->{unique} && @{$_->{cols}} == 1 } @possible_keys; if ( @candidate_cols ) { $can_chunk_exact = 1; } } if ( !@candidate_cols ) { @candidate_cols = grep { $int_types{$table->{type_for}->{$_}} || $real_types{$table->{type_for}->{$_}} } map { $_->{cols}->[0] } @possible_keys; } my @result; if ( $table->{keys}->{PRIMARY} ) { my $pk_first_col = $table->{keys}->{PRIMARY}->{cols}->[0]; @result = grep { $_ eq $pk_first_col } @candidate_cols; @candidate_cols = grep { $_ ne $pk_first_col } @candidate_cols; } my $i = 0; my %col_pos = map { $_ => $i++ } @{$table->{cols}}; push @result, sort { $col_pos{$a} <=> $col_pos{$b} } @candidate_cols; return ($can_chunk_exact, \@result); } sub calculate_chunks { my ( $self, %args ) = @_; foreach my $arg ( qw(table col min max rows_in_range size dbh) ) { die "Required argument $arg not given or undefined" unless defined $args{$arg}; } my @chunks; my ($range_func, $start_point, $end_point); my $col_type = $args{table}->{type_for}->{$args{col}}; if ( $col_type =~ m/(?:int|year|float|double|decimal)$/ ) { $start_point = $args{min}; $end_point = $args{max}; $range_func = 'range_num'; } elsif ( $col_type eq 'timestamp' ) { ($start_point, $end_point) = $args{dbh}->selectrow_array( "SELECT UNIX_TIMESTAMP('$args{min}'), UNIX_TIMESTAMP('$args{max}')"); $range_func = 'range_timestamp'; } elsif ( $col_type eq 'date' ) { ($start_point, $end_point) = $args{dbh}->selectrow_array( "SELECT TO_DAYS('$args{min}'), TO_DAYS('$args{max}')"); $range_func = 'range_date'; } elsif ( $col_type eq 'time' ) { ($start_point, $end_point) = $args{dbh}->selectrow_array( "SELECT TIME_TO_SEC('$args{min}'), TIME_TO_SEC('$args{max}')"); $range_func = 'range_time'; } elsif ( $col_type eq 'datetime' ) { $start_point = $self->timestampdiff($args{dbh}, $args{min}); $end_point = $self->timestampdiff($args{dbh}, $args{max}); $range_func = 'range_datetime'; } else { die "I don't know how to chunk $col_type\n"; } if ( !defined $start_point ) { $start_point = 0; } if ( !defined $end_point || $end_point < $start_point ) { $end_point = 0; } my $interval = $args{size} * ($end_point - $start_point) / $args{rows_in_range}; if ( $int_types{$col_type} ) { $interval = ceil($interval); } $interval ||= $args{size}; if ( $args{exact} ) { $interval = $args{size}; } my $col = "`$args{col}`"; if ( $start_point < $end_point ) { my ( $beg, $end ); my $iter = 0; for ( my $i = $start_point; $i < $end_point; $i += $interval ) { ( $beg, $end ) = $self->$range_func($args{dbh}, $i, $interval, $end_point); if ( $iter++ == 0 ) { push @chunks, "$col < " . $self->quote($end); } else { push @chunks, "$col >= " . $self->quote($beg) . " AND $col < " . $self->quote($end); } } my $nullable = $args{table}->{is_nullable}->{$args{col}}; pop @chunks; if ( @chunks ) { push @chunks, "$col >= " . $self->quote($beg); } else { push @chunks, $nullable ? "$col IS NOT NULL" : '1=1'; } if ( $nullable ) { push @chunks, "$col IS NULL"; } } else { push @chunks, '1=1'; } return @chunks; } sub get_first_chunkable_column { my ( $self, $table, $opts ) = @_; my ($exact, $cols) = $self->find_chunk_columns($table, $opts); return $cols->[0]; } sub size_to_rows { my ( $self, $dbh, $db, $tbl, $size, $cache ) = @_; my $avg_row_length; my $status; if ( !$cache || !($status = $cache->{$db}->{$tbl}) ) { $tbl =~ s/_/\\_/g; my $sth = $dbh->prepare( "SHOW TABLE STATUS FROM `$db` LIKE '$tbl'"); $sth->execute; $status = $sth->fetchrow_hashref(); if ( $cache ) { $cache->{$db}->{$tbl} = $status; } } my ($key) = grep { /avg_row_length/i } keys %$status; $avg_row_length = $status->{$key}; return $avg_row_length ? ceil($size / $avg_row_length) : undef; } sub get_range_statistics { my ( $self, $dbh, $db, $tbl, $col, $opts ) = @_; my ( $min, $max ) = $dbh->selectrow_array( "SELECT MIN(`$col`), MAX(`$col`) FROM `$db`.`$tbl`"); my $expl = $dbh->selectrow_hashref( "EXPLAIN SELECT * FROM `$db`.`$tbl"); return ( min => $min, max => $max, rows_in_range => $expl->{rows}, ); } sub quote { my ( $self, $val ) = @_; return $val =~ m/\d[:-]/ ? qq{"$val"} : $val; } sub range_num { my ( $self, $dbh, $start, $interval, $max ) = @_; my $end = min($max, $start + $interval); $start =~ s/\.(\d{5}).*$/.$1/; $end =~ s/\.(\d{5}).*$/.$1/; if ( $end > $start ) { return ( $start, $end ); } else { die "Chunk size is too small: $end !> $start\n"; } } sub range_time { my ( $self, $dbh, $start, $interval, $max ) = @_; return $dbh->selectrow_array( "SELECT SEC_TO_TIME($start), SEC_TO_TIME(LEAST($max, $start + $interval))"); } sub range_date { my ( $self, $dbh, $start, $interval, $max ) = @_; return $dbh->selectrow_array( "SELECT FROM_DAYS($start), FROM_DAYS(LEAST($max, $start + $interval))"); } sub range_datetime { my ( $self, $dbh, $start, $interval, $max ) = @_; return $dbh->selectrow_array( "SELECT DATE_ADD('$EPOCH', INTERVAL $start SECOND), DATE_ADD('$EPOCH', INTERVAL LEAST($max, $start + $interval) SECOND)"); } sub range_timestamp { my ( $self, $dbh, $start, $interval, $max ) = @_; return $dbh->selectrow_array( "SELECT FROM_UNIXTIME($start), FROM_UNIXTIME(LEAST($max, $start + $interval))"); } sub timestampdiff { my ( $self, $dbh, $time ) = @_; my ( $diff ) = $dbh->selectrow_array( "SELECT (TO_DAYS('$time') * 86400 + TIME_TO_SEC('$time')) " . "- TO_DAYS('$EPOCH 00:00:00') * 86400"); my ( $check ) = $dbh->selectrow_array( "SELECT DATE_ADD('$EPOCH', INTERVAL $diff SECOND)"); die <<" EOF" Incorrect datetime math: given $time, calculated $diff but checked to $check. This is probably because you are using a version of MySQL that overflows on large interval values to DATE_ADD(). If not, please report this as a bug. EOF unless $check eq $time; return $diff; } 1; # ########################################################################### # End TableChunker package # ########################################################################### package main; use DBI; use English qw(-no_match_vars); use File::Basename qw(dirname); use File::Spec; use List::Util qw(max sum); use POSIX; use Time::HiRes qw(time); our $VERSION = '0.9.10'; our $DISTRIB = '1053'; our $SVN_REV = sprintf("%d", q$Revision: 1046 $ =~ m/(\d+)/g || 0); # Globals -- as few as possible. my %opts; my $default_dir; my @mysqldump_args; my %conn = ( F => 'mysql_read_default_file', h => 'host', P => 'port', S => 'mysql_socket' ); if ( !caller ) { # ############################################################################ # Get configuration information. # ############################################################################ my @opt_spec = ( { s => 'age=s', d => 'Dump only modified or not recently dumped tables' }, { s => 'basedir=s', d => 'Base directory for creating files (default cwd)' }, { s => 'binlogpos|b!', d => 'Dump the master/slave position (default)' }, { s => 'chunksize|C=s', d => 'Number of rows or data size to dump per file' }, { s => 'csv', d => 'Do --tab dump in CSV format (implies --tab)' }, { s => 'databases|d=s', d => 'Dump only this comma-separated list of databases' }, { s => 'dbregex=s', d => 'Dump only databases whose names match this pattern' }, { s => 'defaultset!', d => 'When --sets given, dump tables not in any set' }, { s => 'defaults-file|F=s', d => 'Only read default options from the given file' }, { s => 'flushlock|k!', d => 'Use FLUSH TABLES WITH READ LOCK (default)' }, { s => 'flushlog!', d => 'Execute FLUSH LOGS when getting binlog positions' }, { s => 'gzip!', d => 'Compress files with gzip (default on non-Win32)' }, { s => 'host|h=s', d => 'Connect to host' }, { s => 'help', d => 'Show this help message' }, { s => 'ignoredb|g=s', d => 'Ignore this comma-separated list of databases' }, { s => 'ignoretbl|n=s', d => 'Ignore this comma-separated list of tables' }, { s => 'locktables!', d => 'Use LOCK TABLES (implies --no-flushlock)' }, { s => 'numthread|m=i', d => 'Number of threads (default #CPUs or 2)' }, { s => 'opt!', d => 'Use sensible mysqldump options (enabled by default)' }, { s => 'password|p=s', d => 'Password to use when connecting' }, { s => 'port|P=i', d => 'Port number to use for connection' }, { s => 'quiet|q', d => 'Set --verbose to 0' }, { s => 'sets=s', d => 'Dump this comma-separated list of sets' }, { s => 'setperdb', d => 'Dump each database as a separate set' }, { s => 'settable=s', d => 'database.table where backup sets are stored' }, { s => 'socket|S=s', d => 'Socket file to use for connection' }, { s => 'tab|T', d => 'Dump tab-separated (sets --umask 0)' }, { s => 'tables|t=s', d => 'Dump only this comma-separated list of tables' }, { s => 'test', , d => 'Print commands instead of executing them' }, { s => 'tblregex=s', d => 'Dump only tables whose names match this pattern' }, { s => 'umask=s', d => 'Set umask to this value, in octal' }, { s => 'user|u=s', d => 'User for login if not current user' }, { s => 'verbose|v+', d => 'Verbosity (default 1, can specify multiple times)' }, { s => 'version', d => 'Output version information and exit' }, { s => 'wait|w=s', d => 'Wait limit when server is down (default 5m)' }, ); # Holds command-line options. %opts = ( b => 1, opt => 1, gzip => $OSNAME !~ m/Win32/, basedir => File::Spec->curdir(), w => '5m', sets => '', v => 1, C => '', ); my $opt_parser = OptionParser->new(@opt_spec); %opts = $opt_parser->parse(%opts); # ############################################################################ # Process options. # ############################################################################ $opts{basedir} = File::Spec->rel2abs($opts{basedir}); if ( $opts{q} ) { $opts{v} = 0; } if ( !$opts{m} ) { eval { # Try to read --numthread from the number of CPUs in /proc/cpuinfo. This # only works on GNU/Linux. open my $file, "<", "/proc/cpuinfo" or die $OS_ERROR; local $INPUT_RECORD_SEPARATOR = undef; my $contents = <$file>; close $file; $opts{m} = scalar( map { $_ } $contents =~ m/(processor)/g ); }; $opts{m} ||= $ENV{NUMBER_OF_PROCESSORS}; # MSWin32 $opts{m} = max(2, $opts{m} || 0); } # Set locking options. my $lock_all = $opts{sets} || $opts{locktables} || $opts{setperdb} ? 0 : 1; $opts{k} = $lock_all unless defined $opts{k}; $opts{locktables} = !$lock_all unless defined $opts{locktables}; if ( !$opts{help} ) { if ( $opts{C} && $opts{C} !~ m/^\d+[kGM]?$/ ) { warn "Invalid --chunksize argument\n"; $opts{help} = 1; } if ( $opts{defaultset} && !$opts{sets} ) { warn "--defaultset has no effect without --sets\n"; $opts{help} = 1; } if ( !$opts{m} ) { warn "You must specify --numthread\n"; $opts{help} = 1; } if ( !$opts{help} && $opts{k} && $opts{locktables} ) { warn "--locktables and --flushlock are mutually exclusive\n"; $opts{help} = 1; } if ( !$opts{help} && $opts{age} ) { my ($num, $suf ) = $opts{age} =~ m/(\d+)([smhd])$/; if ( !defined $num || $num <= 0 ) { warn "Invalid --age argument\n"; $opts{help} = 1; } else { $opts{age} = $suf eq 's' ? $num # Seconds : $suf eq 'm' ? $num * 60 # Minutes : $suf eq 'h' ? $num * 3600 # Hours : $num * 86400; # Days } } if ( !$opts{help} ) { my ($num, $suf ) = $opts{w} =~ m/(\d+)([smhd])$/; if ( !defined $num || $num <= 0 ) { warn "Invalid --wait argument\n"; $opts{help} = 1; } else { $opts{w} = $suf eq 's' ? $num # Seconds : $suf eq 'm' ? $num * 60 # Minutes : $suf eq 'h' ? $num * 3600 # Hours : $num * 86400; # Days } } if ( !$opts{help} && $opts{sets} && !$opts{settable} ) { warn "--sets requires --settable\n"; $opts{help} = 1; } if ( !$opts{help} && $opts{setperdb} && $opts{sets} ) { warn "--sets and --setperdb are mutually exclusive\n"; $opts{help} = 1; } } if ( $opts{csv} ) { $opts{T} = 1; } if ( $opts{T} ) { if ( !defined $opts{umask} ) { $opts{umask} = 0; } } if ( defined $opts{umask} ) { umask oct($opts{umask}); } # Gather connection parameters to pass to mysqldump. Order matters; mysqldump # will have a problem if --defaults-file isn't first. my @conn_params = ( [qw(--defaults-file F)], [qw(--host h)], [qw(--password p)], [qw(--port P)], [qw(--socket S)], [qw(--user u)], ); @conn_params = map { "$_->[0]='$opts{$_->[1]}'" } grep { defined $opts{$_->[1]} } @conn_params; # Decide on options to mysqldump. if ( $opts{opt} && !@ARGV ) { # Sensible defaults. $default_dir = 1; @mysqldump_args = ( qw(mysqldump), @conn_params, ( $opts{T} ? qw(--no-data) : () ), qw( --add-drop-table --add-locks --allow-keywords --comments --complete-insert --create-options --disable-keys --extended-insert --quick --quote-names --set-charset --skip-lock-tables --triggers --tz-utc '%D' '%N' ), ); if ( $opts{C} ) { push @mysqldump_args, qw( --where '%W' ); } if ( $opts{gzip} ) { push @mysqldump_args, qw( | gzip --force --fast --stdout - > ), '"' . filename('%S', '%D', '%N.%3C.sql.gz') . '"'; } else { push @mysqldump_args, '--result-file="' . filename('%S', '%D', '%N.%3C.sql') . '"'; } } else { @mysqldump_args = @ARGV; } if ( $opts{version} ) { print "mysql-parallel-dump Ver $VERSION Distrib $DISTRIB Changeset $SVN_REV\n"; exit(0); } if ( $opts{help} ) { print "Usage: mysql-parallel-dump [--] \n\n"; print $opt_parser->usage(); (my $usage = <<" USAGE") =~ s/^ //gm; mysql-parallel-dump dumps sets of MySQL tables simultaneously via mysqldump or SELECT INTO OUTFILE. For more details, please read the documentation: perldoc mysql-parallel-dump USAGE print $usage; exit(0); } # Make comma-separated lists into hashes. if ( $opts{d} ) { $opts{d} = { map { $_ => 1 } split(/,\s*/, $opts{d}) }; } $opts{g} = { map { $_ => 1 } split(/,\s*/, $opts{g} || '') }; if ( $opts{t} ) { $opts{t} = { map { $_ => 1 } split(/,\s*/, $opts{t}) }; } $opts{n} = { map { $_ => 1 } split(/,\s*/, $opts{n} || '') }; if ( $opts{e} ) { $opts{e} = { map { lc($_) => 1 } split(/,\s*/, $opts{e}) }; } # ############################################################################ # Connect. # ############################################################################ my $dbh = get_dbh(); $dbh->{InactiveDestroy} = 1; # Don't die on fork(). $dbh->{FetchHashKeyName} = 'NAME_lc'; # Lowercases all column names for fetchrow_hashref() # This signal handler will do nothing but wake up the sleeping parent process # and record the exit status and time of the child that exited (as a side # effect of not discarding the signal). my %exited_children; $SIG{CHLD} = sub { my $kid; while (($kid = waitpid(-1, POSIX::WNOHANG)) > 0) { # Must right-shift to get the actual exit status of the child. $exited_children{$kid}->{exit_status} = $CHILD_ERROR >> 8; $exited_children{$kid}->{exit_time} = time(); } }; # ############################################################################ # Derive a last-modified timestamp # ############################################################################ my ( $age ) = $opts{age} ? $dbh->selectrow_array("SELECT DATE_SUB(NOW(), INTERVAL $opts{age} SECOND)") : undef; # ############################################################################ # Lock the whole server if desired. # ############################################################################ if ( $opts{k} && !$opts{test} ) { $dbh->do('FLUSH TABLES WITH READ LOCK'); } # ############################################################################ # Iterate over "sets" of tables. # ############################################################################ my %tables_in_sets; my %tables_for_set; my %stats_for_set; my @sets_to_do = unique($opts{sets} =~ m/(\w+)/g); # Fetch backup sets from the database. my $backedup_sth; if ( $opts{sets} ) { foreach my $set ( @sets_to_do ) { die "'default' is a reserved set; don't use it\n" if lc $set eq 'default'; my $sql = "SELECT `db`, `tbl` " . "FROM $opts{settable} " . "WHERE `setname` = '$set' " . ($age ? "AND `ts` <= '$age' " : '') . "ORDER BY `priority`, `db`, `tbl`"; print $sql, "\n" if $opts{test}; my $result = $dbh->selectall_arrayref($sql, { Slice => {} } ); foreach my $row ( @$result ) { $stats_for_set{$set}->{tables}++; $tables_in_sets{$row->{db}}->{$row->{tbl}}++; push @{$tables_for_set{$set}}, [ $row->{db}, $row->{tbl} ]; } } if ( $age ) { $backedup_sth = $dbh->prepare( "UPDATE $opts{settable} AS `mysql_parallel_dump_writable` SET `ts` = NOW() " . " WHERE `setname` = ? AND `db` = ? AND `tbl` = ?"); } } my %databases_for; my @databases; my %table_status; # Do all databases and tables in a 'default' set, possibly excluding those # that have been included in named sets above. Or, if --setperdb is # specified, place each into its own set. if ( !$opts{sets} || $opts{defaultset} ) { @databases = grep { $_ !~ m/^(information_schema|lost\+found)$/mi } @{$dbh->selectcol_arrayref('SHOW DATABASES')}; if ( !$opts{sets} ) { @databases = grep { ( !$opts{d} || exists($opts{d}->{$_}) ) && ( !$opts{dbregex} || $_ =~ m/$opts{dbregex}/ ) && ( !exists $opts{g}->{$_} ) } @databases; } DATABASE: foreach my $database ( @databases ) { my $set = $opts{setperdb} ? $database : 'default'; push @{$databases_for{$set}}, $database; @sets_to_do = unique(@sets_to_do, $set); } } # ############################################################################ # Do each backup set. # ############################################################################ SET: foreach my $set ( @sets_to_do ) { if ( !$opts{sets} ) { # Must fetch tables. foreach my $database ( @{$databases_for{$set}} ) { get_tables_in_database( dbh => $dbh, set => $set, database => $database, age => $age, opts => \%opts, tables_in_sets => \%tables_in_sets, stats_for_set => \%stats_for_set, table_status => \%table_status, tables_for_set => \%tables_for_set, ); } } if ( !$tables_for_set{$set} || !@{$tables_for_set{$set}} ) { info(2, "Nothing to do for set $set"); next SET; } my $start = time(); my $stats = $stats_for_set{$set}; # ######################################################################### # Lock tables if needed. Cycle until there are none to lock or we get the # lock (some tables could have been dropped between the time we got the # list and now). # ######################################################################### if ( $opts{locktables} && !$opts{test} ) { my @to_lock; my $done; do { @to_lock = unique( map { "`$_->[0]`.`$_->[1]` READ" } @{$tables_for_set{$set}} ); if ( $backedup_sth ) { push @to_lock, "$opts{settable} AS `mysql_parallel_dump_writable` WRITE"; } eval { $dbh->do('LOCK TABLES ' . join(', ', @to_lock)); $done = 1; }; if ( $EVAL_ERROR ) { my $err = mysql_error_msg($EVAL_ERROR); my ($db, $tbl) = $err =~ m/Table '([^.]+)\.([^.]+)' doesn't exist/; if ( $db && $tbl ) { # Remove the nonexistent table and try again. $tables_for_set{$set} = [ grep { $_->[0] ne $db || $_->[1] ne $tbl } @{$tables_for_set{$set}} ]; info(0, $err); } else { die "Cannot lock tables: $err"; } } } while ( @to_lock && !$done ); } # ######################################################################### # Flush logs. # ######################################################################### if ( $opts{flushlog} && !$opts{test} ) { $dbh->do('FLUSH LOGS'); } my @work_to_do; foreach my $db_tbl ( @{$tables_for_set{$set}} ) { my @chunks = get_chunks($dbh, $opts{C}, $set, @$db_tbl, \%table_status); my $i = 0; foreach my $chunk ( @chunks ) { push @work_to_do, { D => $chunk->{D}, N => $chunk->{N}, S => $chunk->{S}, C => $i++, W => $chunk->{W}, }; $stats_for_set{$set}->{chunks}++; } } # ######################################################################### # Get the master position. # ######################################################################### if ( $opts{b} && !$opts{test} ) { my $filename = filename($set, '00_master_data.sql'); makedir($filename); open my $file, ">", $filename or die $OS_ERROR; my %wanted = map { $_ => 1 } qw(file position master_host master_port master_log_file read_master_log_pos relay_log_file relay_log_pos relay_master_log_file exec_master_log_pos); my ( $master_pos, $slave_pos ); eval { $master_pos = $dbh->selectrow_hashref('SHOW MASTER STATUS'); }; eval { $slave_pos = $dbh->selectrow_hashref('SHOW SLAVE STATUS'); print $file "CHANGE MASTER TO MASTER_HOST='$slave_pos->{master_host}', " . "MASTER_LOG_FILE='$slave_pos->{master_log_file}', " . "MASTER_LOG_POS=$slave_pos->{read_master_log_pos}\n"; }; my %hash; foreach my $thing ( $master_pos, $slave_pos ) { next unless $thing; foreach my $key ( grep { $wanted{$_} } sort keys %$thing ) { print $file "-- $key $thing->{$key}\n"; } } # Put the details of the chunks into the file. foreach my $chunk ( @work_to_do ) { print $file "-- CHUNK $chunk->{D} $chunk->{N} $chunk->{C} $chunk->{W}\n"; } close $file or die $OS_ERROR; } # ######################################################################### # Design the format for printing out. # ######################################################################### my ( $maxdb, $maxtbl, $maxset); $maxdb = max(8, map { length($_->{D}) } @work_to_do); $maxtbl = max(5, map { length($_->{N}) } @work_to_do); $maxset = max(3, length($set)); my $format = "%-${maxset}s %-${maxdb}s %-${maxtbl}s %5s %5s %6s %7s"; info(2, sprintf($format, qw(SET DATABASE TABLE CHUNK TIME STATUS THREADS))); # ######################################################################### # Assign the work to child processes. Initially just start --numthreads # number of children. Each child that exits will trigger a new one to start # after that. This is really a terrible hack -- I wish Perl had decent # threading support so I could just queue work for a fixed pool of worker # threads! # ######################################################################### my %kids; while ( @work_to_do || %kids ) { # Wait for the MySQL server to become responsive. my $tries = 0; while ( !$dbh->ping && $tries++ < $opts{w} ) { sleep(1); eval { $dbh = get_dbh(); }; if ( $EVAL_ERROR ) { info(0, 'Waiting: ' . scalar(localtime) . ' ' . mysql_error_msg($EVAL_ERROR)); } } if ( $tries >= $opts{w} ) { die "Too many retries, exiting.\n"; } # Start a new child process. while ( @work_to_do && $opts{m} > keys %kids ) { my $todo = shift @work_to_do; $todo->{time} = time; my $pid = fork(); die "Can't fork: $OS_ERROR" unless defined $pid; if ( $pid ) { # I'm the parent $kids{$pid} = $todo; } else { # I'm the child my $exit_status = 0; $exit_status = do_table($todo) || $exit_status; exit($exit_status); } } # Possibly wait for child. my $reaped = 0; foreach my $kid ( keys %exited_children ) { my $status = $exited_children{$kid}; my $todo = $kids{$kid}; my $stat = $status->{exit_status}; if ( !$opts{test} && !$stat && $backedup_sth ) { $backedup_sth->execute(@{$todo}{qw(S D N)}); } my $time = $status->{exit_time} - $todo->{time}; info(2, sprintf($format, @{$todo}{qw(S D N C)}, sprintf('%.2f', $time), $stat, scalar(keys %kids))); $stats->{ $stat ? 'failure' : 'success' }++; $stats->{time} += $time; delete $kids{$kid}; delete $exited_children{$kid}; $reaped = 1; } if ( !$reaped ) { # Don't busy-wait. But don't wait forever either, as a child may exit # and signal while we're not sleeping, so if we sleep forever we may # not get the signal. sleep(1); } } if ( $opts{locktables} && !$opts{test} ) { $dbh->do('UNLOCK TABLES'); $dbh->commit; } $stats->{wallclock} = time() - $start; info(1, sprintf( (@sets_to_do ? '%12s: ' : '%s:') . '%5d tables, %5d chunks, %5d successes, %2d failures, ' . '%6.2f wall-clock time, %6.2f dump time', $set, $stats->{tables}, $stats->{chunks}, $stats->{success} || 0, $stats->{failure} || 0, $stats->{wallclock}, $stats->{time})); } $dbh->do('UNLOCK TABLES') unless $opts{test}; $dbh->commit; $dbh->disconnect; if ( @sets_to_do > 1 ) { info(1, sprintf( 'Final result: %2d sets, %5d tables, %5d chunks, %5d successes, %2d failures, ' . '%6.2f wall-clock time, %6.2f dump time', scalar(@sets_to_do), map { my $thing = $_; sum(0, map { $_->{$thing} || 0 } values %stats_for_set); } qw(tables chunks success failure wallclock time) )); } # Exit status is 1 if there were any failures. exit( sum(0, map { $_->{failure} || 0 } values %stats_for_set) ? 1 : 0 ); } # ############################################################################ # Subroutines # ############################################################################ sub mysql_error_msg { my ( $text ) = @_; $text =~ s/^.*?failed: (.*?) at \S+ line (\d+).*$/$1 at line $2/s; return $text; } # Called when sets aren't retrieved from the settbl or when --defaultset is # given. sub get_tables_in_database { my ( %args ) = @_; my $dbh = $args{dbh}; my $set = $args{set}; my $database = $args{database}; my $age = $args{age}; my %opts = %{$args{opts}}; my $tables_in_sets = $args{tables_in_sets}; my $tables_for_set = $args{tables_for_set}; my $stats_for_set = $args{stats_for_set}; my $table_status = $args{table_status}; my $need_table_status = $age || $opts{C} =~ m/\D/; my $tables = $dbh->selectall_arrayref( $need_table_status ? "SHOW TABLE STATUS FROM `$database`" : "SHOW /*!50002 FULL*/ TABLES FROM `$database`", { Slice => {} }); if ( @$tables ) { my ( $name_key ) = $need_table_status ? ( qw(name) ) : ( grep { $_ ne 'table_type' } keys %{$tables->[0]} ); my $type_key = $need_table_status ? 'comment' : 'table_type'; TABLE: foreach my $table ( @$tables ) { my $tblname = $table->{$name_key}; if ( !$opts{sets} ) { next TABLE if ( $opts{t} && !exists($opts{t}->{$tblname}) ) || ( $opts{tblregex} && $tblname !~ m/$opts{tblregex}/ ) || exists $opts{n}->{$tblname}; } next TABLE if ( $tables_in_sets->{$database}->{$tblname} ) || ( $table->{$type_key} && $table->{$type_key} eq 'VIEW' ) || ( $table->{update_time} && $age && $table->{update_time} lt $age ); if ( $need_table_status ) { $table_status->{$database}->{$tblname} = $table; } $stats_for_set->{$set}->{tables}++; $tables_in_sets->{$database}->{$tblname}++; push @{$tables_for_set->{$set}}, [ $database, $tblname ]; } } } sub get_chunks { my ( $dbh, $spec, $set, $db, $tbl, $cache ) = @_; my $rows_per_chunk; my $cant_chunk = { D => $db, N => $tbl, S => $set, W => '1=1', }; return $cant_chunk unless $spec; my $tc = TableChunker->new; # Figure out whether the chunksize is a number of rows or a data size. my ( $num, $suffix ) = $spec =~ m/^(\d+)([MGk])$/; if ( $suffix ) { # Figure out how many rows fit into this many bytes my $size = $suffix eq 'k' ? 1_024 : $suffix eq 'M' ? 1_024 * 1_024 : 1_024 * 1_024 * 1_024; $rows_per_chunk = $tc->size_to_rows($dbh, $db, $tbl, $size * $num, $cache); return $cant_chunk unless $rows_per_chunk; } else { $rows_per_chunk = $spec; } # Get the chunk column candidates my $tp = TableParser->new; my $table = $tp->parse($tp->get_ddl($dbh, $db, $tbl)); my $col = $tc->get_first_chunkable_column($table); return $cant_chunk unless $col; my %params = $tc->get_range_statistics($dbh, $db, $tbl, $col); return $cant_chunk if grep { !defined $params{$_} } qw(min max rows_in_range); my @chunks = $tc->calculate_chunks( dbh => $dbh, table => $table, col => $col, size => $rows_per_chunk, %params, ); return map { { D => $db, N => $tbl, S => $set, W => $_, } } @chunks; } # Prints a message. sub info { my ( $level, $msg ) = @_; if ( $level <= $opts{v} ) { print $msg, "\n"; } } sub unique { my %seen; grep { !$seen{$_}++ } @_; } # Interpolates % directives from a db/tbl hashref. sub interp { my ( $todo, @strings ) = @_; map { $_ =~ s/%(\d+)?([SDNCW])/$1 ? sprintf("%0$1d", $todo->{$2}) : $todo->{$2}/ge } @strings; return @strings; } # Actually dumps a table. sub do_table { my ( $todo ) = @_; my $exit_status = 0; # Dump via SELECT INTO OUTFILE. if ( $opts{T} ) { my $dbh = get_dbh(); my $filename = filename(interp($todo, '%S', '%D', '%N.%3C')); makedir($filename); my $sql; if ( $opts{csv} ) { $sql = qq{SELECT * INTO OUTFILE '$filename.txt' } . qq{FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\\"' } . qq{LINES TERMINATED BY '\\n' FROM `$todo->{D}`.`$todo->{N}`}; } else { $sql = "SELECT * INTO OUTFILE '$filename.txt' " . "FROM `$todo->{D}`.`$todo->{N}` WHERE $todo->{W}"; } if ( $opts{test} ) { print $sql, "\n"; } eval { $dbh->do($sql) unless $opts{test}; $dbh->disconnect; }; if ( $EVAL_ERROR ) { die mysql_error_msg($EVAL_ERROR) . "\n"; } if ( $opts{gzip} ) { $exit_status = system_call( 'gzip', '--force', '--fast', qq{"$filename.txt"}); } } # Normal dump using mysqldump. If $opts{T} was set, the following won't dump # the data because --no-data is in @mysqldump_args. If the user left the # options alone, we can predict the filename and directory naming convention, # and ensure the directiories exist. Otherwise the user must ensure the # directories exist, because it's a hard job to figure out which argument in # the array is a filename. We only do this on the first chunk for each # table. if ( $default_dir && !$todo->{C} ) { makedir(interp($todo, filename('%S', '%D', '%N.%3C.sql'))); } if ( !$default_dir || !$todo->{C} || !$opts{T} ) { # It's either a custom command, or it's the first chunk for a # tab-separated table and we're going to dump the schema, or it's a # regular SQL dump and we're going to dump data. my @args = map { interp($todo, $_) } @mysqldump_args; $exit_status = system_call( @args ) || $exit_status; } return $exit_status; } # Makes a filename. sub filename { my $filename = File::Spec->catfile($opts{basedir}, @_); return $filename; } { # Memoize... my %dirs; # If the directory doesn't exist, makes the directory. sub makedir { my ( $filename ) = @_; return if $opts{test}; my @dirs = File::Spec->splitdir(dirname($filename)); foreach my $i ( 0 .. $#dirs ) { my $dir = File::Spec->catdir(@dirs[0 .. $i]); if ( !$dirs{$dir} ) { if ( ! -d $dir ) { mkdir($dir, 0777); } $dirs{$dir}++; } } } } sub get_dbh { my $dsn = 'DBI:mysql:;' . join(';', map { "$conn{$_}=$opts{$_}" } grep { defined $opts{$_} } qw(F h P S)) . ';mysql_read_default_group=mysql'; my $dbh = DBI->connect($dsn, @opts{qw(u p)}, { AutoCommit => 0, RaiseError => 1, PrintError => 0 } ); return $dbh; } sub system_call { my ( @cmd ) = @_; my $exit_status = 0; if ( $opts{test} ) { print join(' ', @cmd), "\n"; } else { $exit_status = system(join(' ', @cmd)); # Must right-shift to get the actual exit status of the command. # Otherwise the upstream exit() call that's about to happen will get a # larger value than it likes, and will just report zero to waitpid(). $exit_status = $exit_status >> 8; } return $exit_status; } 1; # Because this is a runnable module. # ############################################################################ # Documentation. # ############################################################################ =pod =head1 NAME mysql-parallel-dump - Dump sets of MySQL tables in parallel. =head1 SYNOPSIS mysql-parallel-dump mysql-parallel-dump --tab --basedir /path/to/backups/ mysql-parallel-dump --sets order,profile,session --settable meta.backupset =head1 DESCRIPTION MySQL Parallel Dump connects to a MySQL server, finds database and table names, and dumps them in parallel for speed. It can be used in several pre-packaged ways, or as a generic wrapper to call some program in parallel, passing it parameters for each table. It supports backup sets and dumping only tables that have changed since the last dump. To dump all tables to gzipped files in the current directory, each database with its own directory, with a global read lock, flushing and recording binary log positions, each table in a single file: mysql-parallel-dump To dump tables elsewhere: mysql-parallel-dump --basedir /path/to/elsewhere To dump to tab-separated files with C cannot be directed to a pipe). =item --help Displays a help message. =item --host Connect to host. =item --ignoredb Do not dump this comma-separated list of databases. Only applies when L<"--sets"> is not given. =item --ignoretbl Do not dump this comma-separated list of table (not database.table) names. Only applies when L<"--sets"> is not given. =item --locktables Disables L<"--flushlock"> (unless it was explicitly set) and locks tables with C. Enabled by default when L<"--sets"> is specified. The lock is taken and released with every set of tables dumped. =item --numthread Specifies the number of parallel processes to run. The default is 2 (this is MySQL Parallel Dump, after all -- 1 is not parallel). On GNU/Linux machines, the default is the number of times 'processor' appears in F. On Windows, the default is read from the environment. In any case, the default is at least 2, even when there's only a single processor. =item --opt This is sort of related to C's C<--opt> argument, I. It does I pass C<--opt> to C. Instead, it passes the following: C<--add-drop-table> C<--add-locks> C<--allow-keywords> C<--comments> C<--complete-insert> C<--create-options> C<--disable-keys> C<--extended-insert> C<--quick> C<--quote-names> C<--set-charset> C<--skip-lock-tables> C<--triggers> C<--tz-utc> These are what I consider to be sensible default options. =item --password Password to use when connecting. =item --port Port number to use for connection. =item --quiet Sets L<"--verbose"> to 0. =item --sets Dump this comma-separated list of backup sets, in order. Requires L<"--settable">. See L<"BACKUP SETS">. The special C set is reserved; don't use it as a set name. =item --setperdb Specifies that each database is a separate backup set. Each set is named the same as the database. Implies L<"--locktables">. =item --settable Specifies the table in which backup sets are kept. It may be given in database.table form. =item --socket Socket file to use for connection. =item --tab Dump via C does! I recommend using it only if you're running MySQL Parallel Dump on the same machine as the MySQL server, but there is no protection if you don't. The files will be gzipped after dumping if L<"--gzip"> is enabled. This option sets L<"--umask"> to zero so auto-created directories are writable by the MySQL server. =item --tables Dump this comma-separated list of table (not database.table) names. Only applies when L<"--sets"> is not given. =item --tblregex Dump only tables whose names match this Perl regular expression. Only applies when L<"--sets"> is not given. =item --test Print commands instead of executing them. =item --umask Set the program's C to this octal value. This is useful when you want created files and directories to be readable or writable by other users (for example, the MySQL server itself). =item --user User for login if not current user. =item --verbose Sets the verbosity; repeatedly specifying it increments the verbosity. Default is 1 if not specified. See L<"OUTPUT">. =item --version Output version information and exit. =item --wait If the MySQL server crashes during dumping, waits until the server comes back and then continues with the rest of the tables. The value is a number with a suffix (s=seconds, m=minutes, h=hours, d=days). MySQL Parallel Dump will check the server every second until this time is exhausted, at which point it will give up and exit. This implements Peter Zaitsev's "safe dump" request: sometimes a dump on a server that has corrupt data will kill the server. MySQL Parallel Dump will wait for the server to restart, then keep going. It's hard to say which table killed the server, so no tables will be retried. Tables that were being concurrently dumped when the crash happened will not be retried. No additional locks will be taken after the server restarts; it's assumed this behavior is useful only on a server you're not trying to dump while it's in production. =back =head1 SYSTEM REQUIREMENTS You need Perl, DBI, DBD::mysql, and some core packages that ought to be installed in any reasonably new version of Perl. =head1 BUGS Please use the Sourceforge bug tracker, forums, and mailing lists to request support or report bugs: L. =head1 COPYRIGHT, LICENSE AND WARRANTY This program is copyright (c) 2007 Baron Schwartz. Feedback and improvements are welcome. THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 2; OR the Perl Artistic License. On UNIX and similar systems, you can issue `man perlgpl' or `man perlartistic' to read these licenses. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. =head1 AUTHOR Baron Schwartz. =head1 SEE ALSO See also L. =head1 VERSION This manual page documents Ver 0.9.10 Distrib 1053 $Revision: 1046 $. =cut