diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c63bf2f82254af8dfe681330d133d90aefb9d475..3d9b7f926232ad7b59167d70e3acb0c2824ebcea 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,6 +16,7 @@ build-system2: script: - apt-get install -y -qq libsdl2-dev libgcrypt-dev libbrlapi-dev libaio-dev libfdt-dev liblzo2-dev librdmacm-dev libibverbs-dev libibumad-dev + libzstd-dev - ./configure --enable-werror --target-list="tricore-softmmu unicore32-softmmu microblaze-softmmu mips-softmmu riscv32-softmmu s390x-softmmu sh4-softmmu sparc64-softmmu x86_64-softmmu xtensa-softmmu nios2-softmmu or1k-softmmu" diff --git a/.travis.yml b/.travis.yml index caf0a1f8faf41d827a4ea280c1d2a336897084e1..f3fe04fba938edbdd6aed3a4c8e952b03e87061e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,6 +35,7 @@ addons: - liburcu-dev - libusb-1.0-0-dev - libvte-2.91-dev + - libzstd-dev - sparse - uuid-dev - gcovr diff --git a/configure b/configure index 714e7fb6a1fbd72ccdcd92b506cedf68b80d8a5c..577533e9ed9919f1e0cd8ac7fe6b23e58ec75f1f 100755 --- a/configure +++ b/configure @@ -446,6 +446,7 @@ lzo="" snappy="" bzip2="" lzfse="" +zstd="" guest_agent="" guest_agent_with_vss="no" guest_agent_ntddscsi="no" @@ -1358,6 +1359,10 @@ for opt do ;; --disable-lzfse) lzfse="no" ;; + --disable-zstd) zstd="no" + ;; + --enable-zstd) zstd="yes" + ;; --enable-guest-agent) guest_agent="yes" ;; --disable-guest-agent) guest_agent="no" @@ -1812,6 +1817,8 @@ disabled with --disable-FEATURE, default is enabled if available: (for reading bzip2-compressed dmg images) lzfse support of lzfse compression library (for reading lzfse-compressed dmg images) + zstd support for zstd compression library + (for migration compression) seccomp seccomp support coroutine-pool coroutine freelist (better performance) glusterfs GlusterFS backend @@ -2407,6 +2414,24 @@ EOF fi fi +########################################## +# zstd check + +if test "$zstd" != "no" ; then + if $pkg_config --exist libzstd ; then + zstd_cflags="$($pkg_config --cflags libzstd)" + zstd_libs="$($pkg_config --libs libzstd)" + LIBS="$zstd_libs $LIBS" + QEMU_CFLAGS="$QEMU_CFLAGS $zstd_cflags" + zstd="yes" + else + if test "$zstd" = "yes" ; then + feature_not_found "libzstd" "Install libzstd devel" + fi + zstd="no" + fi +fi + ########################################## # libseccomp check @@ -6460,6 +6485,7 @@ echo "lzo support $lzo" echo "snappy support $snappy" echo "bzip2 support $bzip2" echo "lzfse support $lzfse" +echo "zstd support $zstd" echo "NUMA host support $numa" echo "libxml2 $libxml2" echo "tcmalloc support $tcmalloc" @@ -7024,6 +7050,10 @@ if test "$lzfse" = "yes" ; then echo "LZFSE_LIBS=-llzfse" >> $config_host_mak fi +if test "$zstd" = "yes" ; then + echo "CONFIG_ZSTD=y" >> $config_host_mak +fi + if test "$libiscsi" = "yes" ; then echo "CONFIG_LIBISCSI=m" >> $config_host_mak echo "LIBISCSI_CFLAGS=$libiscsi_cflags" >> $config_host_mak diff --git a/docs/multi-thread-compression.txt b/docs/multi-thread-compression.txt index bb88c6bdf11c25ad9ae9c16c5838e41162fe410f..d429963cb04462d3f734a7dfd529178dbf6e858d 100644 --- a/docs/multi-thread-compression.txt +++ b/docs/multi-thread-compression.txt @@ -33,14 +33,15 @@ thread compression can be used to accelerate the compression process. The decompression speed of Zlib is at least 4 times as quick as compression, if the source and destination CPU have equal speed, -keeping the compression thread count 4 times the decompression -thread count can avoid resource waste. +and you choose Zlib as compression method, keeping the compression +thread count 4 times the decompression thread count can avoid resource waste. Compression level can be used to control the compression speed and the -compression ratio. High compression ratio will take more time, level 0 -stands for no compression, level 1 stands for the best compression -speed, and level 9 stands for the best compression ratio. Users can -select a level number between 0 and 9. +compression ratio. High compression ratio will take more time, +level 1 stands for the best compression speed, and higher level means higher +compression ration. For Zlib, users can select a level number between 0 and 9, +where level 0 stands for no compression. For Zstd, users can select a +level number between 1 and 22. When to use the multiple thread compression in live migration @@ -116,16 +117,19 @@ to support the multiple thread compression migration: 2. Activate compression on the source: {qemu} migrate_set_capability compress on -3. Set the compression thread count on source: +3. Set the compression method: + {qemu} migrate_set_parameter compress_method zstd + +4. Set the compression thread count on source: {qemu} migrate_set_parameter compress_threads 12 -4. Set the compression level on the source: +5. Set the compression level on the source: {qemu} migrate_set_parameter compress_level 1 -5. Set the decompression thread count on destination: +6. Set the decompression thread count on destination: {qemu} migrate_set_parameter decompress_threads 3 -6. Start outgoing migration: +7. Start outgoing migration: {qemu} migrate -d tcp:destination.host:4444 {qemu} info migrate Capabilities: ... compress: on @@ -136,6 +140,7 @@ The following are the default settings: compress_threads: 8 decompress_threads: 2 compress_level: 1 (which means best speed) + compress_method: zlib So, only the first two steps are required to use the multiple thread compression in migration. You can do more if the default @@ -143,7 +148,7 @@ settings are not appropriate. TODO ==== -Some faster (de)compression method such as LZ4 and Quicklz can help -to reduce the CPU consumption when doing (de)compression. If using -these faster (de)compression method, less (de)compression threads +Comparing to Zlib, Some faster (de)compression method such as LZ4 +and Quicklz can help to reduce the CPU consumption when doing (de)compression. +If using these faster (de)compression method, less (de)compression threads are needed when doing the migration. diff --git a/hw/core/qdev-prop-internal.h b/hw/core/qdev-prop-internal.h new file mode 100644 index 0000000000000000000000000000000000000000..a4a7eaf078b26846f9e0241c39211bf78dc6df26 --- /dev/null +++ b/hw/core/qdev-prop-internal.h @@ -0,0 +1,18 @@ +/* + * qdev property parsing + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef HW_CORE_QDEV_PROP_INTERNAL_H +#define HW_CORE_QDEV_PROP_INTERNAL_H + +void get_enum(Object *obj, Visitor *v, const char *name, void *opaque, + Error **errp); +void set_enum(Object *obj, Visitor *v, const char *name, void *opaque, + Error **errp); + +void set_default_value_enum(Object *obj, const Property *prop); + +#endif diff --git a/hw/core/qdev-properties-system.c b/hw/core/qdev-properties-system.c index ba412dd2cabea0961b84443c17afb65e066a9c3d..6d48903c87124d1460d28d7ec8a562341fa258bf 100644 --- a/hw/core/qdev-properties-system.c +++ b/hw/core/qdev-properties-system.c @@ -15,6 +15,7 @@ #include "hw/qdev.h" #include "qapi/error.h" #include "qapi/qmp/qerror.h" +#include "qapi/qapi-types-migration.h" #include "sysemu/block-backend.h" #include "sysemu/blockdev.h" #include "hw/block/block.h" @@ -23,6 +24,7 @@ #include "chardev/char-fe.h" #include "sysemu/iothread.h" #include "sysemu/tpm_backend.h" +#include "qdev-prop-internal.h" static void get_pointer(Object *obj, Visitor *v, Property *prop, char *(*print)(void *ptr), @@ -399,3 +401,14 @@ void qdev_set_nic_properties(DeviceState *dev, NICInfo *nd) } nd->instantiated = 1; } + +/* --- CompressMethod --- */ +const PropertyInfo qdev_prop_compress_method = { + .name = "CompressMethod", + .description = "multi-thread compression method, " + "zlib/zstd", + .enum_table = &CompressMethod_lookup, + .get = get_enum, + .set = set_enum, + .set_default_value = set_default_value_enum, +}; diff --git a/hw/core/qdev-properties.c b/hw/core/qdev-properties.c index 81c97f48a7f09fea32ae6c5e491d78a960d6ac7a..709f9e0f9d8aa75f9d96ae96ef004fe77dc65fb2 100644 --- a/hw/core/qdev-properties.c +++ b/hw/core/qdev-properties.c @@ -11,6 +11,7 @@ #include "qapi/visitor.h" #include "chardev/char.h" #include "qemu/uuid.h" +#include "qdev-prop-internal.h" void qdev_prop_set_after_realize(DeviceState *dev, const char *name, Error **errp) @@ -46,7 +47,7 @@ void *qdev_get_prop_ptr(DeviceState *dev, Property *prop) return ptr; } -static void get_enum(Object *obj, Visitor *v, const char *name, void *opaque, +void get_enum(Object *obj, Visitor *v, const char *name, void *opaque, Error **errp) { DeviceState *dev = DEVICE(obj); @@ -56,7 +57,7 @@ static void get_enum(Object *obj, Visitor *v, const char *name, void *opaque, visit_type_enum(v, prop->name, ptr, prop->info->enum_table, errp); } -static void set_enum(Object *obj, Visitor *v, const char *name, void *opaque, +void set_enum(Object *obj, Visitor *v, const char *name, void *opaque, Error **errp) { DeviceState *dev = DEVICE(obj); @@ -71,7 +72,7 @@ static void set_enum(Object *obj, Visitor *v, const char *name, void *opaque, visit_type_enum(v, prop->name, ptr, prop->info->enum_table, errp); } -static void set_default_value_enum(Object *obj, const Property *prop) +void set_default_value_enum(Object *obj, const Property *prop) { object_property_set_str(obj, qapi_enum_lookup(prop->info->enum_table, @@ -79,6 +80,13 @@ static void set_default_value_enum(Object *obj, const Property *prop) prop->name, &error_abort); } +const PropertyInfo qdev_prop_enum = { + .name = "enum", + .get = get_enum, + .set = set_enum, + .set_default_value = set_default_value_enum, +}; + /* Bit */ static uint32_t qdev_get_prop_mask(Property *prop) diff --git a/include/hw/qdev-properties.h b/include/hw/qdev-properties.h index 1eae5ab056d08599f754a1fb9c1d4644d354e04d..a22a532eb8984dc3f109b6648708025286344d4e 100644 --- a/include/hw/qdev-properties.h +++ b/include/hw/qdev-properties.h @@ -23,6 +23,7 @@ extern const PropertyInfo qdev_prop_tpm; extern const PropertyInfo qdev_prop_ptr; extern const PropertyInfo qdev_prop_macaddr; extern const PropertyInfo qdev_prop_on_off_auto; +extern const PropertyInfo qdev_prop_compress_method; extern const PropertyInfo qdev_prop_losttickpolicy; extern const PropertyInfo qdev_prop_blockdev_on_error; extern const PropertyInfo qdev_prop_bios_chs_trans; @@ -205,6 +206,9 @@ extern const PropertyInfo qdev_prop_pcie_link_width; DEFINE_PROP(_n, _s, _f, qdev_prop_macaddr, MACAddr) #define DEFINE_PROP_ON_OFF_AUTO(_n, _s, _f, _d) \ DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_on_off_auto, OnOffAuto) +#define DEFINE_PROP_COMPRESS_METHOD(_n, _s, _f, _d) \ + DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_compress_method, \ + CompressMethod) #define DEFINE_PROP_LOSTTICKPOLICY(_n, _s, _f, _d) \ DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_losttickpolicy, \ LostTickPolicy) diff --git a/migration/migration.c b/migration/migration.c index 0e396f22b44ba09e04ed40e2a6b028016a36ae8d..17a5c16c79bddaac3866e3805cbbf23b9003f51e 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -71,6 +71,7 @@ #define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */ #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 +#define DEFAULT_MIGRATE_COMPRESS_METHOD COMPRESS_METHOD_ZLIB /* Define default autoconverge cpu throttle migration parameters */ #define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20 #define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10 @@ -748,6 +749,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp) params->compress_wait_thread = s->parameters.compress_wait_thread; params->has_decompress_threads = true; params->decompress_threads = s->parameters.decompress_threads; + params->has_compress_method = true; + params->compress_method = s->parameters.compress_method; params->has_cpu_throttle_initial = true; params->cpu_throttle_initial = s->parameters.cpu_throttle_initial; params->has_cpu_throttle_increment = true; @@ -1108,16 +1111,40 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, } } +static bool compress_level_check(MigrationParameters *params, Error **errp) +{ + switch (params->compress_method) { + case COMPRESS_METHOD_ZLIB: + if (params->compress_level > 9 || params->compress_level < 1) { + error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level", + "a value in the range of 0 to 9 for Zlib method"); + return false; + } + break; +#ifdef CONFIG_ZSTD + case COMPRESS_METHOD_ZSTD: + if (params->compress_level > 19 || params->compress_level < 1) { + error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level", + "a value in the range of 1 to 19 for Zstd method"); + return false; + } + break; +#endif + default: + error_setg(errp, "Checking compress_level failed for unknown reason"); + return false; + } + + return true; +} + /* * Check whether the parameters are valid. Error will be put into errp * (if provided). Return true if valid, otherwise false. */ static bool migrate_params_check(MigrationParameters *params, Error **errp) { - if (params->has_compress_level && - (params->compress_level > 9)) { - error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level", - "is invalid, it should be in the range of 0 to 9"); + if (params->has_compress_level && !compress_level_check(params, errp)) { return false; } @@ -1250,6 +1277,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params, dest->decompress_threads = params->decompress_threads; } + if (params->has_compress_method) { + dest->compress_method = params->compress_method; + } + if (params->has_cpu_throttle_initial) { dest->cpu_throttle_initial = params->cpu_throttle_initial; } @@ -1331,6 +1362,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) s->parameters.decompress_threads = params->decompress_threads; } + if (params->has_compress_method) { + s->parameters.compress_method = params->compress_method; + } + if (params->has_cpu_throttle_initial) { s->parameters.cpu_throttle_initial = params->cpu_throttle_initial; } @@ -2132,6 +2167,15 @@ int migrate_decompress_threads(void) return s->parameters.decompress_threads; } +CompressMethod migrate_compress_method(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters.compress_method; +} + bool migrate_dirty_bitmaps(void) { MigrationState *s; @@ -3436,6 +3480,9 @@ static Property migration_properties[] = { DEFINE_PROP_UINT8("x-decompress-threads", MigrationState, parameters.decompress_threads, DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT), + DEFINE_PROP_COMPRESS_METHOD("compress-method", MigrationState, + parameters.compress_method, + DEFAULT_MIGRATE_COMPRESS_METHOD), DEFINE_PROP_UINT8("x-cpu-throttle-initial", MigrationState, parameters.cpu_throttle_initial, DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL), @@ -3535,6 +3582,7 @@ static void migration_instance_init(Object *obj) params->has_compress_level = true; params->has_compress_threads = true; params->has_decompress_threads = true; + params->has_compress_method = true; params->has_cpu_throttle_initial = true; params->has_cpu_throttle_increment = true; params->has_max_bandwidth = true; diff --git a/migration/migration.h b/migration/migration.h index f2bd4ebe3327e416be8e282c5d308381fc639c00..4aa72297fc7af8b1aa7fc98b427e4b198b805ff9 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -319,6 +319,7 @@ int migrate_compress_level(void); int migrate_compress_threads(void); int migrate_compress_wait_thread(void); int migrate_decompress_threads(void); +CompressMethod migrate_compress_method(void); bool migrate_use_events(void); bool migrate_postcopy_blocktime(void); diff --git a/migration/qemu-file.c b/migration/qemu-file.c index cd96d04e9a4a786afb454301cfa88e17fd45f4d2..3bba694ed4eecea097401c51a31c8eb77fe2b330 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -382,6 +382,15 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size, } } +static void add_buf_to_iovec(QEMUFile *f, size_t len) +{ + add_to_iovec(f, f->buf + f->buf_index, len, false); + f->buf_index += len; + if (f->buf_index == IO_BUF_SIZE) { + qemu_fflush(f); + } +} + void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size, bool may_free) { @@ -686,72 +695,6 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } -/* return the size after compression, or negative value on error */ -static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, - const uint8_t *source, size_t source_len) -{ - int err; - - err = deflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = source_len; - stream->next_in = (uint8_t *)source; - stream->avail_out = dest_len; - stream->next_out = dest; - - err = deflate(stream, Z_FINISH); - if (err != Z_STREAM_END) { - return -1; - } - - return stream->next_out - dest; -} - -/* Compress size bytes of data start at p and store the compressed - * data to the buffer of f. - * - * When f is not writable, return -1 if f has no space to save the - * compressed data. - * When f is wirtable and it has no space to save the compressed data, - * do fflush first, if f still has no space to save the compressed - * data, return -1. - */ -ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, - const uint8_t *p, size_t size) -{ - ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); - - if (blen < compressBound(size)) { - if (!qemu_file_is_writable(f)) { - return -1; - } - qemu_fflush(f); - blen = IO_BUF_SIZE - sizeof(int32_t); - if (blen < compressBound(size)) { - return -1; - } - } - - blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), - blen, p, size); - if (blen < 0) { - return -1; - } - - qemu_put_be32(f, blen); - if (f->ops->writev_buffer) { - add_to_iovec(f, f->buf + f->buf_index, blen, false); - } - f->buf_index += blen; - if (f->buf_index == IO_BUF_SIZE) { - qemu_fflush(f); - } - return blen + sizeof(int32_t); -} - /* Put the data in the buffer of f_src to the buffer of f_des, and * then reset the buf_index of f_src to 0. */ @@ -811,3 +754,15 @@ void qemu_file_set_blocking(QEMUFile *f, bool block) f->ops->set_blocking(f->opaque, block); } } + +ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr) +{ + *dest_ptr = f->buf + f->buf_index + sizeof(int32_t); + return IO_BUF_SIZE - f->buf_index - sizeof(int32_t); +} + +void qemu_put_compress_end(QEMUFile *f, unsigned int v) +{ + qemu_put_be32(f, v); + add_buf_to_iovec(f, v); +} diff --git a/migration/qemu-file.h b/migration/qemu-file.h index 5de9fa2e96568da4eaba954d8a5b5851adf5cb0c..6570e53e13cfb1e2f75737d123d2d55be341a36f 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -134,8 +134,6 @@ bool qemu_file_is_writable(QEMUFile *f); size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); -ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, - const uint8_t *p, size_t size); int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); /* @@ -162,6 +160,8 @@ void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data); +ssize_t qemu_put_compress_start(QEMUFile *f, uint8_t **dest_ptr); +void qemu_put_compress_end(QEMUFile *f, unsigned int v); /* Whenever this is found in the data stream, the flags * will be passed to ram_control_load_hook in the incoming-migration * side. This lets before_ram_iterate/after_ram_iterate add diff --git a/migration/ram.c b/migration/ram.c index 92ce1a53e7a4a08bdd237d65ce45346af882478d..ba1e729c395f7d3b67c1fda9c356eae390ffbed4 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -59,6 +59,10 @@ #include "savevm.h" #include "qemu/iov.h" +#ifdef CONFIG_ZSTD +#include +#include +#endif /***********************************************************/ /* ram save/restore */ @@ -415,8 +419,16 @@ struct CompressParam { ram_addr_t offset; /* internally used fields */ - z_stream stream; uint8_t *originbuf; + + /* for zlib compression */ + z_stream stream; + +#ifdef CONFIG_ZSTD + ZSTD_CStream *zstd_cs; + ZSTD_inBuffer in; + ZSTD_outBuffer out; +#endif }; typedef struct CompressParam CompressParam; @@ -428,12 +440,34 @@ struct DecompressParam { void *des; uint8_t *compbuf; int len; + + /* for zlib compression */ z_stream stream; +#ifdef CONFIG_ZSTD + ZSTD_DStream *zstd_ds; + ZSTD_inBuffer in; + ZSTD_outBuffer out; +#endif }; typedef struct DecompressParam DecompressParam; +typedef struct { + int (*save_setup)(CompressParam *param); + void (*save_cleanup)(CompressParam *param); + ssize_t (*compress_data)(CompressParam *param, size_t size); +} MigrationCompressOps; + +typedef struct { + int (*load_setup)(DecompressParam *param); + void (*load_cleanup)(DecompressParam *param); + int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size); + int (*check_len)(int len); +} MigrationDecompressOps; + static CompressParam *comp_param; static QemuThread *compress_threads; +static MigrationCompressOps *compress_ops; +static MigrationDecompressOps *decompress_ops; /* comp_done_cond is used to wake up the migration thread when * one of the compression threads has finished the compression. * comp_done_lock is used to co-work with comp_done_cond. @@ -449,26 +483,285 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); +static bool do_compress_ram_page(CompressParam *param, RAMBlock *block); + +static int zlib_save_setup(CompressParam *param) +{ + if (deflateInit(¶m->stream, + migrate_compress_level()) != Z_OK) { + return -1; + } + + return 0; +} + +static ssize_t zlib_compress_data(CompressParam *param, size_t size) +{ + int err; + uint8_t *dest = NULL; + z_stream *stream = ¶m->stream; + uint8_t *p = param->originbuf; + QEMUFile *f = f = param->file; + ssize_t blen = qemu_put_compress_start(f, &dest); + + if (blen < compressBound(size)) { + return -1; + } + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = size; + stream->next_in = p; + stream->avail_out = blen; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + blen = stream->next_out - dest; + if (blen < 0) { + return -1; + } + + qemu_put_compress_end(f, blen); + return blen + sizeof(int32_t); +} + +static void zlib_save_cleanup(CompressParam *param) +{ + deflateEnd(¶m->stream); +} + +static int zlib_load_setup(DecompressParam *param) +{ + if (inflateInit(¶m->stream) != Z_OK) { + return -1; + } + + return 0; +} + +static int +zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size) +{ + int err; + + z_stream *stream = ¶m->stream; + + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = param->len; + stream->next_in = param->compbuf; + stream->avail_out = size; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->total_out; +} + +static void zlib_load_cleanup(DecompressParam *param) +{ + inflateEnd(¶m->stream); +} + +static int zlib_check_len(int len) +{ + return len < 0 || len > compressBound(TARGET_PAGE_SIZE); +} + +#ifdef CONFIG_ZSTD +static int zstd_save_setup(CompressParam *param) +{ + int res; + param->zstd_cs = ZSTD_createCStream(); + if (!param->zstd_cs) { + return -1; + } + res = ZSTD_initCStream(param->zstd_cs, migrate_compress_level()); + if (ZSTD_isError(res)) { + return -1; + } + return 0; +} +static void zstd_save_cleanup(CompressParam *param) +{ + ZSTD_freeCStream(param->zstd_cs); + param->zstd_cs = NULL; +} +static ssize_t zstd_compress_data(CompressParam *param, size_t size) +{ + int ret; + uint8_t *dest = NULL; + uint8_t *p = param->originbuf; + QEMUFile *f = f = param->file; + ssize_t blen = qemu_put_compress_start(f, &dest); + if (blen < ZSTD_compressBound(size)) { + return -1; + } + param->out.dst = dest; + param->out.size = blen; + param->out.pos = 0; + param->in.src = p; + param->in.size = size; + param->in.pos = 0; + do { + ret = ZSTD_compressStream2(param->zstd_cs, ¶m->out, + ¶m->in, ZSTD_e_end); + } while (ret > 0 && (param->in.size - param->in.pos > 0) + && (param->out.size - param->out.pos > 0)); + if (ret > 0 && (param->in.size - param->in.pos > 0)) { + return -1; + } + if (ZSTD_isError(ret)) { + return -1; + } + blen = param->out.pos; + qemu_put_compress_end(f, blen); + return blen + sizeof(int32_t); +} + +static int zstd_load_setup(DecompressParam *param) +{ + int ret; + param->zstd_ds = ZSTD_createDStream(); + if (!param->zstd_ds) { + return -1; + } + ret = ZSTD_initDStream(param->zstd_ds); + if (ZSTD_isError(ret)) { + return -1; + } + return 0; +} +static void zstd_load_cleanup(DecompressParam *param) +{ + ZSTD_freeDStream(param->zstd_ds); + param->zstd_ds = NULL; +} +static int +zstd_decompress_data(DecompressParam *param, uint8_t *dest, size_t size) +{ + int ret; + param->out.dst = dest; + param->out.size = size; + param->out.pos = 0; + param->in.src = param->compbuf; + param->in.size = param->len; + param->in.pos = 0; + do { + ret = ZSTD_decompressStream(param->zstd_ds, ¶m->out, ¶m->in); + } while (ret > 0 && (param->in.size - param->in.pos > 0) + && (param->out.size - param->out.pos > 0)); + if (ret > 0 && (param->in.size - param->in.pos > 0)) { + return -1; + } + if (ZSTD_isError(ret)) { + return -1; + } + return ret; +} +static int zstd_check_len(int len) +{ + return len < 0 || len > ZSTD_compressBound(TARGET_PAGE_SIZE); +} +#endif + +static int set_compress_ops(void) +{ + compress_ops = g_new0(MigrationCompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + compress_ops->save_setup = zlib_save_setup; + compress_ops->save_cleanup = zlib_save_cleanup; + compress_ops->compress_data = zlib_compress_data; + break; +#ifdef CONFIG_ZSTD + case COMPRESS_METHOD_ZSTD: + compress_ops->save_setup = zstd_save_setup; + compress_ops->save_cleanup = zstd_save_cleanup; + compress_ops->compress_data = zstd_compress_data; + break; +#endif + default: + return -1; + } + + return 0; +} + +static int set_decompress_ops(void) +{ + decompress_ops = g_new0(MigrationDecompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + decompress_ops->load_setup = zlib_load_setup; + decompress_ops->load_cleanup = zlib_load_cleanup; + decompress_ops->decompress_data = zlib_decompress_data; + decompress_ops->check_len = zlib_check_len; + break; +#ifdef CONFIG_ZSTD + case COMPRESS_METHOD_ZSTD: + decompress_ops->load_setup = zstd_load_setup; + decompress_ops->load_cleanup = zstd_load_cleanup; + decompress_ops->decompress_data = zstd_decompress_data; + decompress_ops->check_len = zstd_check_len; + break; +#endif + default: + return -1; + } + + return 0; +} + +static void clean_compress_ops(void) +{ + compress_ops->save_setup = NULL; + compress_ops->save_cleanup = NULL; + compress_ops->compress_data = NULL; + + g_free(compress_ops); + compress_ops = NULL; +} + +static void clean_decompress_ops(void) +{ + decompress_ops->load_setup = NULL; + decompress_ops->load_cleanup = NULL; + decompress_ops->decompress_data = NULL; + + g_free(decompress_ops); + decompress_ops = NULL; +} static void *do_data_compress(void *opaque) { CompressParam *param = opaque; RAMBlock *block; - ram_addr_t offset; bool zero_page; qemu_mutex_lock(¶m->mutex); while (!param->quit) { if (param->block) { block = param->block; - offset = param->offset; param->block = NULL; qemu_mutex_unlock(¶m->mutex); - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); + zero_page = do_compress_ram_page(param, block); qemu_mutex_lock(&comp_done_lock); param->done = true; @@ -512,7 +805,7 @@ static void compress_threads_save_cleanup(void) qemu_thread_join(compress_threads + i); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); + compress_ops->save_cleanup(&comp_param[i]); g_free(comp_param[i].originbuf); qemu_fclose(comp_param[i].file); comp_param[i].file = NULL; @@ -523,6 +816,7 @@ static void compress_threads_save_cleanup(void) g_free(comp_param); compress_threads = NULL; comp_param = NULL; + clean_compress_ops(); } static int compress_threads_save_setup(void) @@ -532,6 +826,12 @@ static int compress_threads_save_setup(void) if (!migrate_use_compression()) { return 0; } + + if (set_compress_ops() < 0) { + clean_compress_ops(); + return -1; + } + thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); @@ -543,8 +843,7 @@ static int compress_threads_save_setup(void) goto exit; } - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { + if (compress_ops->save_setup(&comp_param[i]) < 0) { g_free(comp_param[i].originbuf); goto exit; } @@ -2212,28 +2511,29 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, return 1; } -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf) +static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) { RAMState *rs = ram_state; + ram_addr_t offset = param->offset; uint8_t *p = block->host + (offset & TARGET_PAGE_MASK); bool zero_page = false; int ret; - if (save_zero_page_to_file(rs, f, block, offset)) { + if (save_zero_page_to_file(rs, param->file, block, offset)) { zero_page = true; goto exit; } - save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); + save_page_header(rs, param->file, block, + offset | RAM_SAVE_FLAG_COMPRESS_PAGE); /* * copy it to a internal buffer to avoid it being modified by VM * so that we can catch up the error during compression and * decompression */ - memcpy(source_buf, p, TARGET_PAGE_SIZE); - ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); + memcpy(param->originbuf, p, TARGET_PAGE_SIZE); + ret = compress_ops->compress_data(param, TARGET_PAGE_SIZE); if (ret < 0) { qemu_file_set_error(migrate_get_current()->to_dst_file, ret); error_report("compressed data failed!"); @@ -3924,50 +4224,20 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } -/* return the size after decompression, or negative value on error */ -static int -qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, - const uint8_t *source, size_t source_len) -{ - int err; - - err = inflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = source_len; - stream->next_in = (uint8_t *)source; - stream->avail_out = dest_len; - stream->next_out = dest; - - err = inflate(stream, Z_NO_FLUSH); - if (err != Z_STREAM_END) { - return -1; - } - - return stream->total_out; -} - static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; - unsigned long pagesize; uint8_t *des; - int len, ret; + int ret; qemu_mutex_lock(¶m->mutex); while (!param->quit) { if (param->des) { des = param->des; - len = param->len; param->des = 0; qemu_mutex_unlock(¶m->mutex); - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); + ret = decompress_ops->decompress_data(param, des, TARGET_PAGE_SIZE); if (ret < 0 && migrate_get_current()->decompress_error_check) { error_report("decompress data failed"); qemu_file_set_error(decomp_file, ret); @@ -4037,7 +4307,7 @@ static void compress_threads_load_cleanup(void) qemu_thread_join(decompress_threads + i); qemu_mutex_destroy(&decomp_param[i].mutex); qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); + decompress_ops->load_cleanup(&decomp_param[i]); g_free(decomp_param[i].compbuf); decomp_param[i].compbuf = NULL; } @@ -4046,6 +4316,7 @@ static void compress_threads_load_cleanup(void) decompress_threads = NULL; decomp_param = NULL; decomp_file = NULL; + clean_decompress_ops(); } static int compress_threads_load_setup(QEMUFile *f) @@ -4056,6 +4327,11 @@ static int compress_threads_load_setup(QEMUFile *f) return 0; } + if (set_decompress_ops() < 0) { + clean_decompress_ops(); + return -1; + } + thread_count = migrate_decompress_threads(); decompress_threads = g_new0(QemuThread, thread_count); decomp_param = g_new0(DecompressParam, thread_count); @@ -4063,7 +4339,7 @@ static int compress_threads_load_setup(QEMUFile *f) qemu_cond_init(&decomp_done_cond); decomp_file = f; for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { + if (decompress_ops->load_setup(&decomp_param[i]) < 0) { goto exit; } @@ -4605,7 +4881,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) case RAM_SAVE_FLAG_COMPRESS_PAGE: len = qemu_get_be32(f); - if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + if (decompress_ops->check_len(len)) { error_report("Invalid compressed data length: %d", len); ret = -EINVAL; break; diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c index fc5d6b92c4b6b9dfaa8a3495dd4a0be30a11a140..e5a7a88ba2b303b97ab3c52a8a88ad8ce3a05489 100644 --- a/monitor/hmp-cmds.c +++ b/monitor/hmp-cmds.c @@ -41,6 +41,7 @@ #include "qapi/qapi-commands-tpm.h" #include "qapi/qapi-commands-ui.h" #include "qapi/qapi-visit-net.h" +#include "qapi/qapi-visit-migration.h" #include "qapi/qmp/qdict.h" #include "qapi/qmp/qerror.h" #include "qapi/string-input-visitor.h" @@ -426,6 +427,9 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict) MigrationParameter_str(MIGRATION_PARAMETER_DECOMPRESS_THREADS), params->decompress_threads); assert(params->has_cpu_throttle_initial); + monitor_printf(mon, "%s: %s\n", + MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_METHOD), + CompressMethod_str(params->compress_method)); monitor_printf(mon, "%s: %u\n", MigrationParameter_str(MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL), params->cpu_throttle_initial); @@ -1756,6 +1760,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) MigrateSetParameters *p = g_new0(MigrateSetParameters, 1); uint64_t valuebw = 0; uint64_t cache_size; + CompressMethod compress_method; Error *err = NULL; int val, ret; @@ -1781,6 +1786,14 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) p->has_decompress_threads = true; visit_type_int(v, param, &p->decompress_threads, &err); break; + case MIGRATION_PARAMETER_COMPRESS_METHOD: + p->has_compress_method = true; + visit_type_CompressMethod(v, param, &compress_method, &err); + if (err) { + break; + } + p->compress_method = compress_method; + break; case MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL: p->has_cpu_throttle_initial = true; visit_type_int(v, param, &p->cpu_throttle_initial, &err); diff --git a/qapi/migration.json b/qapi/migration.json index 6844ddfab3a238b6d0aee08e8e738904473b9c78..587ef65872e1cdeecb5350623b8bb5d64fec3839 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -482,6 +482,19 @@ ## { 'command': 'query-migrate-capabilities', 'returns': ['MigrationCapabilityStatus']} +## +# @CompressMethod: +# +# An enumeration of multi-thread compression methods. +# +# @zlib: use zlib compression method. +# +# Since: 5.0 +# +## +{ 'enum': 'CompressMethod', + 'data': [ 'zlib', { 'name': 'zstd', 'if': 'defined(CONFIG_ZSTD)' } ] } + ## # @MigrationParameter: # @@ -518,6 +531,9 @@ # compression, so set the decompress-threads to the number about 1/4 # of compress-threads is adequate. # +# @compress-method: Which multi-thread compression method to use. +# Defaults to none. (Since 5.0) +# # @cpu-throttle-initial: Initial percentage of time guest cpus are throttled # when migration auto-converge is activated. The # default value is 20. (Since 2.7) @@ -586,7 +602,7 @@ 'data': ['announce-initial', 'announce-max', 'announce-rounds', 'announce-step', 'compress-level', 'compress-threads', 'decompress-threads', - 'compress-wait-thread', + 'compress-wait-thread', 'compress-method', 'cpu-throttle-initial', 'cpu-throttle-increment', 'tls-creds', 'tls-hostname', 'tls-authz', 'max-bandwidth', 'downtime-limit', 'x-checkpoint-delay', 'block-incremental', @@ -620,6 +636,9 @@ # # @decompress-threads: decompression thread count # +# @compress-method: Set compression method to use in multi-thread compression. +# Defaults to none. (Since 5.0) +# # @cpu-throttle-initial: Initial percentage of time guest cpus are # throttled when migration auto-converge is activated. # The default value is 20. (Since 2.7) @@ -695,6 +714,7 @@ '*compress-threads': 'int', '*compress-wait-thread': 'bool', '*decompress-threads': 'int', + '*compress-method': 'CompressMethod', '*cpu-throttle-initial': 'int', '*cpu-throttle-increment': 'int', '*tls-creds': 'StrOrNull', @@ -753,6 +773,9 @@ # # @decompress-threads: decompression thread count # +# @compress-method: Which multi-thread compression method to use. +# Defaults to none. (Since 5.0) +# # @cpu-throttle-initial: Initial percentage of time guest cpus are # throttled when migration auto-converge is activated. # (Since 2.7) @@ -828,6 +851,7 @@ '*compress-threads': 'uint8', '*compress-wait-thread': 'bool', '*decompress-threads': 'uint8', + '*compress-method': 'CompressMethod', '*cpu-throttle-initial': 'uint8', '*cpu-throttle-increment': 'uint8', '*tls-creds': 'str',