From a622284b9a7c6da6868d5c622cd3357e9fa701f4 Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 23 Dec 2025 17:02:18 +0800 Subject: [PATCH 1/2] add ignore error: SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 2 + .../distributeddb/common/include/db_errno.h | 1 + .../distributeddb/common/src/db_common.cpp | 25 +++++++-- .../interfaces/include/store_types.h | 1 + .../interfaces/src/kv_store_errno.cpp | 1 + .../include/cloud/cloud_storage_utils.h | 2 + .../storage/src/cloud/cloud_storage_utils.cpp | 15 ++++-- ...ver_relational_storage_extend_executor.cpp | 11 ++-- .../syncer/src/cloud/cloud_db_proxy.cpp | 6 ++- .../syncer/src/cloud/cloud_sync_utils.cpp | 51 +++++++++++++++++-- .../syncer/src/cloud/cloud_sync_utils.h | 15 +++++- .../syncer/src/cloud/cloud_syncer.cpp | 4 +- .../syncer/src/cloud/cloud_syncer.h | 2 + .../syncer/src/cloud/cloud_syncer_extend.cpp | 13 ++--- .../src/cloud/cloud_syncer_extend_extend.cpp | 10 ++++ .../common/common/rdb_data_generator.cpp | 1 + .../rdb/distributeddb_basic_rdb_test.cpp | 36 ++++++++++++- .../common/syncer/cloud/virtual_cloud_db.cpp | 34 +++++++------ .../common/syncer/cloud/virtual_cloud_db.h | 4 +- 19 files changed, 180 insertions(+), 54 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 68ea51bedec..96a2db07ae4 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -131,6 +131,8 @@ public: static bool IsRecordAssetsMissing(const VBucket &record); + static bool IsRecordAssetsSpaceInsufficient(const VBucket &record); + static bool IsRecordDelete(const VBucket &record); static bool IsCloudRecordNotFound(const VBucket &record); diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index f847c4be63b..3bc7b9caade 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -195,6 +195,7 @@ constexpr const int E_FEEDBACK_DB_CLOSING = (E_BASE + 208); // Db was closing fe constexpr const int E_NEED_CORRECT_TARGET_USER = (E_BASE + 209); constexpr const int E_CLOUD_ASSET_NOT_FOUND = (E_BASE + 210); // Cloud download asset return 404 error constexpr const int E_TASK_INTERRUPTED = (E_BASE + 211); // Task(cloud sync, generate log) interrupted +constexpr const int E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT = (E_BASE + 212); // Upload failed by cloud space insufficient } // namespace DistributedDB #endif // DISTRIBUTEDDB_ERRNO_H diff --git a/frameworks/libs/distributeddb/common/src/db_common.cpp b/frameworks/libs/distributeddb/common/src/db_common.cpp index 9d8d3317f0d..f1940f6ca64 100644 --- a/frameworks/libs/distributeddb/common/src/db_common.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common.cpp @@ -24,6 +24,7 @@ #endif #include #include +#include #include "cloud/cloud_db_constant.h" #include "cloud/cloud_db_types.h" @@ -481,11 +482,15 @@ bool DBCommon::IsRecordError(const VBucket &record) if (record.at(CloudDbConstant::ERROR_FIELD).index() != TYPE_INDEX) { return false; } + static std::unordered_set ignoreErrCodes = { + static_cast(DBStatus::CLOUD_RECORD_EXIST_CONFLICT), + static_cast(DBStatus::CLOUD_RECORD_ALREADY_EXISTED), + static_cast(DBStatus::CLOUD_RECORD_NOT_FOUND), + static_cast(DBStatus::LOCAL_ASSET_NOT_FOUND), + static_cast(DBStatus::SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT), + }; auto status = std::get(record.at(CloudDbConstant::ERROR_FIELD)); - return status != static_cast(DBStatus::CLOUD_RECORD_EXIST_CONFLICT) && - status != static_cast(DBStatus::CLOUD_RECORD_ALREADY_EXISTED) && - status != static_cast(DBStatus::CLOUD_RECORD_NOT_FOUND) && - status != static_cast(DBStatus::LOCAL_ASSET_NOT_FOUND); + return ignoreErrCodes.find(status) == ignoreErrCodes.end(); } bool DBCommon::IsIntTypeRecordError(const VBucket &record) @@ -541,6 +546,18 @@ bool DBCommon::IsRecordAssetsMissing(const VBucket &record) return status == static_cast(DBStatus::LOCAL_ASSET_NOT_FOUND); } +bool DBCommon::IsRecordAssetsSpaceInsufficient(const VBucket &record) +{ + if (record.find(CloudDbConstant::ERROR_FIELD) == record.end()) { + return false; + } + if (record.at(CloudDbConstant::ERROR_FIELD).index() != TYPE_INDEX) { + return false; + } + auto status = std::get(record.at(CloudDbConstant::ERROR_FIELD)); + return status == static_cast(DBStatus::SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT); +} + bool DBCommon::IsRecordDelete(const VBucket &record) { if (record.find(CloudDbConstant::DELETE_FIELD) == record.end()) { diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index 833c4998c51..e74ae3b0c2f 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -98,6 +98,7 @@ enum DBStatus { NEED_CORRECT_TARGET_USER, // The target user ID is incorrect and needs to be re-obtained CLOUD_ASSET_NOT_FOUND, // The cloud download asset return 404 error TASK_INTERRUPTED, // Task(cloud sync) interrupted + SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, // Whitelist for contact, skip when cloud space insufficient BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp index 61ccbb5ebb6..cbe2ccdd205 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp @@ -89,6 +89,7 @@ namespace { { -E_DISTRIBUTED_FIELD_DECREASE, DISTRIBUTED_FIELD_DECREASE }, { -E_CLOUD_ASSET_NOT_FOUND, CLOUD_ASSET_NOT_FOUND }, { -E_TASK_INTERRUPTED, TASK_INTERRUPTED }, + { -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT }, }; } diff --git a/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h b/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h index 94f2299e52c..8e4e70dc69b 100644 --- a/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h +++ b/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h @@ -221,6 +221,8 @@ public: static int ConvertLogToLocal(sqlite3 *dbHandle, const std::string &tableName, const std::vector &gids); + + static bool IsNeedMarkUploadFinishedWithErr(const VBucket &record); private: static int IdentifyCloudTypeInner(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags); diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp index 130ea49df61..16adba32afb 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp @@ -773,7 +773,7 @@ int CloudStorageUtils::FillAssetForAbnormal(Asset &asset, Asset &dbAsset, { dbAsset.assetId = asset.assetId; dbAsset.status = AssetStatus::ABNORMAL; - LOGW("Asset %s not found locally, status set to ABNORMAL", DBCommon::StringMiddleMasking(asset.assetId).c_str()); + LOGW("Asset %s status set to ABNORMAL", DBCommon::StringMiddleMasking(asset.assetId).c_str()); return E_OK; } @@ -987,7 +987,7 @@ std::string CloudStorageUtils::GetUpdateRecordFlagSqlUpload(const std::string &t if (isNeedCompensated && !(isDeleted && gidEmpty)) { sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? OR " + "flag & 0x01 = 0 THEN flag | " + compensatedBit + " ELSE flag"; - } else if (DBCommon::IsRecordAssetsMissing(uploadExtend)) { + } else if (IsNeedMarkUploadFinishedWithErr(uploadExtend)) { sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? THEN " + "(flag & ~" + compensatedBit + " & ~" + inconsistencyBit + ") | " + uploadFinishBit + " ELSE (flag & ~" + compensatedBit + ") | " + uploadFinishBit; @@ -1435,8 +1435,8 @@ int CloudStorageUtils::HandleRecordErrorOrAssetsMissing(SQLiteSingleVerRelationa { std::string sql = CloudStorageUtils::GetUpdateRecordFlagSqlUpload( param.tableName, DBCommon::IsRecordIgnored(record), logInfo, record, param.type); - if (DBCommon::IsRecordAssetsMissing(record)) { - LOGI("[CloudStorageUtils][UpdateRecordFlagAfterUpload] Record assets missing, skip update."); + if (IsNeedMarkUploadFinishedWithErr(record)) { + LOGI("[CloudStorageUtils] Record need update flag."); int errCode = handle->UpdateRecordFlag(param.tableName, sql, logInfo); if (errCode != E_OK) { LOGE("[CloudStorageUtils] Update record flag failed"); @@ -1470,7 +1470,7 @@ int CloudStorageUtils::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStor logInfo.timestamp = updateData.timestamp[i]; logInfo.dataKey = updateData.rowid[i]; logInfo.hashKey = updateData.hashKey[i]; - if (DBCommon::IsRecordError(record) || DBCommon::IsRecordAssetsMissing(record) || + if (DBCommon::IsRecordError(record) || IsNeedMarkUploadFinishedWithErr(record) || DBCommon::IsRecordVersionConflict(record) || isLock) { errCode = CloudStorageUtils::HandleRecordErrorOrAssetsMissing(handle, record, logInfo, param); if (errCode != E_OK) { @@ -1602,4 +1602,9 @@ int CloudStorageUtils::ConvertLogToLocal(sqlite3 *dbHandle, const std::string &t LOGI("[CloudStorageUtils][ConvertLogToLocal] cnt:%zu, %" PRIu64, gids.size(), count); return E_OK; } + +bool CloudStorageUtils::IsNeedMarkUploadFinishedWithErr(const VBucket &record) +{ + return DBCommon::IsRecordAssetsMissing(record) || DBCommon::IsRecordAssetsSpaceInsufficient(record); +} } diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp index cbf428d240f..1b9d5293ffe 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp @@ -354,7 +354,8 @@ int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpTyp if (DBCommon::IsRecordAssetsMissing(data.extend.at(index))) { CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets, CloudStorageUtils::FillAssetForAbnormal, CloudStorageUtils::FillAssetsForAbnormal); - } else if (DBCommon::IsRecordError(data.extend.at(index))) { + } else if (DBCommon::IsRecordAssetsSpaceInsufficient(data.extend.at(index)) || + DBCommon::IsRecordError(data.extend.at(index))) { CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets, CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed); } else { @@ -969,12 +970,8 @@ int SQLiteSingleVerRelationalStorageExecutor::BindStmtWithCloudGid(const CloudSy for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) { auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD); if (gidEntry == cloudDataResult.insData.extend[i].end()) { - bool isSkipAssetsMissRecord = false; - if (DBCommon::IsRecordAssetsMissing(cloudDataResult.insData.extend[i])) { - LOGI("[RDBExecutor] Local assets missing and skip filling assets."); - isSkipAssetsMissRecord = true; - } - if (ignoreEmptyGid || isSkipAssetsMissRecord) { + if (ignoreEmptyGid || + CloudStorageUtils::IsNeedMarkUploadFinishedWithErr(cloudDataResult.insData.extend[i])) { continue; } errCode = -E_INVALID_ARGS; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index ec08bc8c7f5..e616fe9b9c3 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -15,6 +15,7 @@ #include "cloud_db_proxy.h" #include "cloud/cloud_db_constant.h" #include "cloud/cloud_storage_utils.h" +#include "cloud/cloud_sync_utils.h" #include "db_common.h" #include "db_errno.h" #include "kv_store_errno.h" @@ -503,6 +504,8 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) return -E_CLOUD_DISABLED; case CLOUD_ASSET_NOT_FOUND: return -E_CLOUD_ASSET_NOT_FOUND; + case SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT: + return -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT; default: return -E_CLOUD_ERROR; } @@ -636,8 +639,7 @@ bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets) bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, const CloudWaterType &type, DBStatus status) { - if (DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) || - DBCommon::IsRecordIgnored(extend)) { + if (CloudSyncUtils::IsIgnoreFailAction(extend, type)) { return false; } if (extend.count(CloudDbConstant::GID_FIELD) == 0 || DBCommon::IsRecordFailed(extend, status)) { diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp index ad86d4efe53..c969dc767e8 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp @@ -458,7 +458,7 @@ bool CloudSyncUtils::IsAssetsMissing(const std::vector &extend) return false; } for (size_t i = 0; i < extend.size(); ++i) { - if (DBCommon::IsIntTypeRecordError(extend[i]) && DBCommon::IsRecordAssetsMissing(extend[i])) { + if (DBCommon::IsRecordAssetsMissing(extend[i])) { return true; } } @@ -474,11 +474,11 @@ int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, con int errCode = E_OK; for (size_t i = 0; i < data.assets.size(); i++) { if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) || - (errorCode != E_OK && - (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) || + IsIgnoreFailAssetErr(data.extend[i]) || + (errorCode != E_OK && DBCommon::IsRecordError(data.extend[i])) || DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) { - if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) { - LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id"); + if (IsIgnoreFailAssetErr(data.extend[i])) { + LOGI("[CloudSyncUtils][FileAssetIdToAssets] skip fill assets id by ignore error"); } continue; } @@ -1083,4 +1083,45 @@ bool CloudSyncUtils::NotNeedToCompensated(int errCode) } return false; } + +bool CloudSyncUtils::IsCloudErrorWithoutAbort(int errCode) +{ + return errCode == -E_LOCAL_ASSET_NOT_FOUND || errCode == -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT; +} + +bool CloudSyncUtils::IsAssetsSpaceInsufficient(const std::vector &extend) +{ + if (extend.empty()) { + return false; + } + for (size_t i = 0; i < extend.size(); ++i) { + if (DBCommon::IsRecordAssetsSpaceInsufficient(extend[i])) { + return true; + } + } + return false; +} + +int CloudSyncUtils::GetNoAbortErrorCode(bool isInsert, const CloudSyncData &uploadData) +{ + const std::vector &extend = isInsert ? uploadData.insData.extend : uploadData.updData.extend; + if (CloudSyncUtils::IsAssetsMissing(extend)) { + return -E_LOCAL_ASSET_NOT_FOUND; + } + if (CloudSyncUtils::IsAssetsSpaceInsufficient(extend)) { + return -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT; + } + return E_OK; +} + +bool CloudSyncUtils::IsIgnoreFailAction(const VBucket &extend, const CloudWaterType &type) +{ + return IsIgnoreFailAssetErr(extend) || DBCommon::IsRecordIgnoredForReliability(extend, type) || + DBCommon::IsRecordIgnored(extend); +} + +bool CloudSyncUtils::IsIgnoreFailAssetErr(const VBucket &extend) +{ + return DBCommon::IsRecordAssetsMissing(extend) || DBCommon::IsRecordAssetsSpaceInsufficient(extend); +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h index 4b0024704a6..3afab31edd5 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h @@ -152,12 +152,23 @@ public: static bool CanStartAsyncDownload(int scheduleCount); - static bool IsAssetsMissing(const std::vector &extend); - static bool NotNeedToCompensated(int errCode); + + static bool IsCloudErrorWithoutAbort(int errCode); + + static bool IsAssetsSpaceInsufficient(const std::vector &extend); + + // if exist error but not abort sync task, return it errCode + static int GetNoAbortErrorCode(bool isInsert, const CloudSyncData &uploadData); + + static bool IsIgnoreFailAction(const VBucket &extend, const CloudWaterType &type); private: static void InsertOrReplaceChangedDataByType(ChangeType type, std::vector &pkVal, ChangedData &changedData); + + static bool IsIgnoreFailAssetErr(const VBucket &extend); + + static bool IsAssetsMissing(const std::vector &extend); }; } #endif // CLOUD_SYNC_UTILS_H \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index 79368a04398..e1890fe3dc4 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -1102,9 +1102,9 @@ void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const Inne currentContext_.notifier->UpdateProcess(innerProcessInfo); } else { currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo, - taskInfo.errCode == -E_LOCAL_ASSET_NOT_FOUND); + CloudSyncUtils::IsCloudErrorWithoutAbort(taskInfo.errCode)); } - if (taskInfo.errCode == -E_LOCAL_ASSET_NOT_FOUND) { + if (CloudSyncUtils::IsCloudErrorWithoutAbort(taskInfo.errCode)) { cloudTaskInfos_[uploadParam.taskId].errCode = E_OK; } } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index a104ac8eeda..f34cb9f455b 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -568,6 +568,8 @@ protected: void RetainCurrentTaskInfo(TaskId taskId); + void SetCurrentTmpError(int errCode); + mutable std::mutex dataLock_; TaskId lastTaskId_; std::multimap> taskQueue_; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp index 86104600c85..865a68d1bbc 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp @@ -190,13 +190,9 @@ int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData } innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount; innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount; - bool isLocalAssetNotFound = isInsert ? CloudSyncUtils::IsAssetsMissing(uploadData.insData.extend): - CloudSyncUtils::IsAssetsMissing(uploadData.updData.extend); - if (errCode == E_OK && isLocalAssetNotFound) { - TaskId currentTaskId = GetCurrentTaskId(); - std::lock_guard guard(dataLock_); - cloudTaskInfos_[currentTaskId].errCode = -E_LOCAL_ASSET_NOT_FOUND; - cloudTaskInfos_[currentTaskId].tempErrCode = -E_LOCAL_ASSET_NOT_FOUND; + int noAbortErrCode = CloudSyncUtils::GetNoAbortErrorCode(isInsert, uploadData); + if (noAbortErrCode != E_OK) { + SetCurrentTmpError(noAbortErrCode); } if (errCode == -E_CLOUD_VERSION_CONFLICT) { ProcessVersionConflictInfo(innerProcessInfo, retryCount); @@ -233,11 +229,10 @@ int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInse CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData; bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend); if (isSkip) { - LOGI("[CloudSyncer][BackFillAfterBatchUpload] Try to FillCloudLogAndAsset when assets missing: %d", + LOGI("[CloudSyncer][BackFillAfterBatchUpload] Skip FillCloudLogAndAsset: %d", errCode); return E_OK; } else { - LOGE("[CloudSyncer][BackFillAfterBatchUpload] errCode: %d, can not skip assets missing record.", errCode); return errCode; } } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index b9d632ecade..e3dea56545f 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -217,4 +217,14 @@ void CloudSyncer::RetainCurrentTaskInfo(TaskId taskId) resumeTaskInfos_.clear(); } } + +void CloudSyncer::SetCurrentTmpError(int errCode) +{ + std::lock_guard guard(dataLock_); + if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) { + return; + } + cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode; + cloudTaskInfos_[currentContext_.currentTaskId].tempErrCode = errCode; +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/rdb_data_generator.cpp b/frameworks/libs/distributeddb/test/unittest/common/common/rdb_data_generator.cpp index ec8cf0405ef..7da0c77b607 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/rdb_data_generator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/common/rdb_data_generator.cpp @@ -143,6 +143,7 @@ Asset RDBDataGenerator::GenerateAsset(int64_t index, const DistributedDB::Field Asset asset; asset.name = field.colName + "_" + std::to_string(index); asset.hash = "default_hash"; + asset.status = AssetStatus::INSERT; return asset; } diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp index a48d3951c98..72753cc8b29 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp @@ -257,8 +257,9 @@ HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample004, TestSize.Level0) std::string sql = "UPDATE " + g_defaultTable1 + " SET name='update'"; EXPECT_EQ(ExecuteSQL(sql, info1), E_OK); EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 2); + virtualCloudDb->SetLocalAssetNotFound(false); RDBGeneralUt::CloudBlockSync(info1, query); - EXPECT_EQ(RDBGeneralUt::GetAbnormalCount(g_defaultTable1, DBStatus::LOCAL_ASSET_NOT_FOUND), 2); + EXPECT_EQ(RDBGeneralUt::GetAbnormalCount(g_defaultTable1, DBStatus::LOCAL_ASSET_NOT_FOUND), 0); } @@ -279,7 +280,7 @@ HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample005, TestSize.Level0) ASSERT_EQ(BasicUnitTest::InitDelegate(info1, "dev1"), E_OK); InsertLocalDBData(0, 2, info1); EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 2); - + std::shared_ptr virtualCloudDb = RDBGeneralUt::GetVirtualCloudDb(); ASSERT_NE(virtualCloudDb, nullptr); virtualCloudDb->SetLocalAssetNotFound(true); @@ -341,6 +342,37 @@ HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample006, TestSize.Level0) EXPECT_EQ(RDBGeneralUt::GetCloudDataCount(g_defaultTable1), 0); EXPECT_EQ(RDBGeneralUt::GetCloudDataCount(g_defaultTable2), 2); } + +/** + * @tc.name: RdbCloudSyncExample008 + * @tc.desc: Test upload failed, when return SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample008, TestSize.Level0) +{ + RelationalStoreDelegate::Option option; + option.tableMode = DistributedTableMode::COLLABORATION; + SetOption(option); + auto info1 = GetStoreInfo1(); + ASSERT_EQ(BasicUnitTest::InitDelegate(info1, "dev1"), E_OK); + InsertLocalDBData(0, 1, info1); + EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 1); + + std::shared_ptr virtualCloudDb = RDBGeneralUt::GetVirtualCloudDb(); + ASSERT_NE(virtualCloudDb, nullptr); + virtualCloudDb->SetUploadRecordStatus(DBStatus::SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT); + + ASSERT_EQ(SetDistributedTables(info1, {g_defaultTable1}, TableSyncType::CLOUD_COOPERATION), E_OK); + RDBGeneralUt::SetCloudDbConfig(info1); + Query query = Query::Select().FromTable({g_defaultTable1}); + RDBGeneralUt::CloudBlockSync(info1, query, OK, SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT); + + std::string sql = "UPDATE " + g_defaultTable1 + " SET name='update'"; + EXPECT_EQ(ExecuteSQL(sql, info1), E_OK); + virtualCloudDb->SetUploadRecordStatus(DBStatus::OK); + RDBGeneralUt::CloudBlockSync(info1, query, OK, OK); +} #endif // USE_DISTRIBUTEDDB_CLOUD /** diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 0bf32450f78..652640c269e 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -91,11 +91,8 @@ DBStatus VirtualCloudDb::InnerBatchInsert(const std::string &tableName, std::vec if (conflictInUpload_) { extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::CLOUD_RECORD_EXIST_CONFLICT); } - if (localAssetNotFound_) { - extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::LOCAL_ASSET_NOT_FOUND); - } - if (cloudSpaceInsufficient_) { - extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT); + if (uploadRecordStatus_ != DBStatus::OK) { + extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(uploadRecordStatus_); } extend[i][g_gidField] = std::to_string(currentGid_++); extend[i][g_cursorField] = std::to_string(currentCursor_++); @@ -112,7 +109,7 @@ DBStatus VirtualCloudDb::InnerBatchInsert(const std::string &tableName, std::vec cloudData_[tableName].push_back(cloudData); auto gid = std::get(extend[i][g_gidField]); } - return res; + return res == OK ? uploadRecordStatus_ : res; } DBStatus VirtualCloudDb::BatchInsertWithGid(const std::string &tableName, std::vector &&record, @@ -190,12 +187,13 @@ DBStatus VirtualCloudDb::BatchDelete(const std::string &tableName, std::vector &&record, @@ -481,11 +479,8 @@ DBStatus VirtualCloudDb::InnerUpdateWithoutLock(const std::string &tableName, st if (conflictInUpload_) { extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::CLOUD_RECORD_EXIST_CONFLICT); } - if (localAssetNotFound_) { - extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::LOCAL_ASSET_NOT_FOUND); - } - if (cloudSpaceInsufficient_) { - extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT); + if (uploadRecordStatus_ != DBStatus::OK) { + extend[i][CloudDbConstant::ERROR_FIELD] = static_cast(uploadRecordStatus_); } extend[i][g_cursorField] = std::to_string(currentCursor_++); AddAssetIdForExtend(record[i], extend[i]); @@ -682,7 +677,9 @@ void VirtualCloudDb::AddAssetIdForExtend(VBucket &record, VBucket &extend) for (auto &recordData : record) { if (recordData.second.index() == TYPE_INDEX) { auto &asset = std::get(recordData.second); - if (asset.flag == static_cast(DistributedDB::AssetOpType::INSERT)) { + if (uploadRecordStatus_ == DBStatus::SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT) { + asset.status = static_cast(AssetStatus::ABNORMAL); + } else if (asset.flag == static_cast(DistributedDB::AssetOpType::INSERT)) { asset.assetId = "10"; } extend[recordData.first] = asset; @@ -698,7 +695,9 @@ void VirtualCloudDb::AddAssetIdForExtend(VBucket &record, VBucket &extend) void VirtualCloudDb::AddAssetsIdInner(Assets &assets) { for (auto &asset : assets) { - if (asset.flag == static_cast(DistributedDB::AssetOpType::INSERT)) { + if (uploadRecordStatus_ == DBStatus::SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT) { + asset.status = static_cast(AssetStatus::ABNORMAL); + } else if (asset.flag == static_cast(DistributedDB::AssetOpType::INSERT)) { asset.assetId = "10"; } } @@ -719,4 +718,9 @@ void VirtualCloudDb::ForkAfterQueryResult(const std::function &&record, std::vector &extend); @@ -138,7 +140,6 @@ private: std::atomic heartbeatError_ = false; std::atomic lockStatus_ = false; std::atomic conflictInUpload_ = false; - std::atomic localAssetNotFound_ = false; std::atomic blockTimeMs_ = 0; std::atomic heartbeatBlockTimeMs_ = 0; std::atomic currentGid_ = 0; @@ -165,6 +166,7 @@ private: std::vector &)> forkUploadConflictFunc_; std::function insertCheckFunc_; std::function &)> forkAfterQueryResult_; + DBStatus uploadRecordStatus_ = OK; }; } #endif // VIRTUAL_CLOUD_DB_H -- Gitee From 557df54a4206c27fcb07bb3505f997aceb19905b Mon Sep 17 00:00:00 2001 From: zqq Date: Sat, 27 Dec 2025 10:58:18 +0800 Subject: [PATCH 2/2] fixbug in ignore error asset Signed-off-by: zqq --- .../libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp | 4 ++-- .../libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h | 3 ++- .../distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp index c969dc767e8..7a1a8b8bb5c 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.cpp @@ -439,13 +439,13 @@ void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam ¶m) param.withoutRowIdData.assetInsertData.clear(); } -bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector &extend) +bool CloudSyncUtils::IsSkipErrAssetsRecord(const std::vector &extend) { if (extend.empty()) { return false; } for (size_t i = 0; i < extend.size(); ++i) { - if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) { + if (DBCommon::IsIntTypeRecordError(extend[i]) && !IsIgnoreFailAssetErr(extend[i])) { return false; } } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h index 3afab31edd5..8962da57010 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_sync_utils.h @@ -83,7 +83,7 @@ public: static void ClearWithoutData(ICloudSyncer::SyncParam ¶m); - static bool IsSkipAssetsMissingRecord(const std::vector &extend); + static bool IsSkipErrAssetsRecord(const std::vector &extend); static int FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type); @@ -162,6 +162,7 @@ public: static int GetNoAbortErrorCode(bool isInsert, const CloudSyncData &uploadData); static bool IsIgnoreFailAction(const VBucket &extend, const CloudWaterType &type); + private: static void InsertOrReplaceChangedDataByType(ChangeType type, std::vector &pkVal, ChangedData &changedData); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp index 865a68d1bbc..609f3cdf727 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp @@ -227,10 +227,9 @@ int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInse if (errCode != E_OK) { storageProxy_->FillCloudGidAndLogIfSuccess(opType, uploadData); CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData; - bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend); + bool isSkip = CloudSyncUtils::IsSkipErrAssetsRecord(data.extend); if (isSkip) { - LOGI("[CloudSyncer][BackFillAfterBatchUpload] Skip FillCloudLogAndAsset: %d", - errCode); + LOGI("[CloudSyncer][BackFillAfterBatchUpload] Skip errCode %d", errCode); return E_OK; } else { return errCode; -- Gitee