diff --git a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h index 5d10c2c4fedd32e9743727f1475dede6010e884d..d4981de8c43530f684442a52a846cca744e17f75 100644 --- a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h +++ b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h @@ -107,6 +107,7 @@ public: static constexpr std::chrono::milliseconds ASYNC_GEN_LOG_INTERVAL = std::chrono::milliseconds(20); static constexpr std::chrono::milliseconds LONG_TIME_TRANSACTION = std::chrono::milliseconds(1000); + static constexpr std::chrono::milliseconds LONG_TRANSACTION_INTERVAL = std::chrono::milliseconds(50); // change local flag to :keep async download asset[0x1000] // mark local last write[0x02] diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index eba410cff8822d59fdcde678dbdb4d1d6b252d59..b9abe1cc63fb2abf9a7b99e7d238a566635aa243 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -281,6 +281,10 @@ public: int WaitAsyncGenLogTaskFinished(const std::vector &tables) override; int ResetGenLogTaskStatus(); + +#ifdef USE_DISTRIBUTEDDB_CLOUD + int AgingCloudNoneExistRecord(const std::string &tableName) override; +#endif protected: int FillReferenceData(CloudSyncData &syncData); @@ -359,6 +363,14 @@ private: int StartTransactionForAsyncDownload(TransactType type); +#ifdef USE_DISTRIBUTEDDB_CLOUD + int GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records); + + int DeleteOneBatchCloudNoneExistRecord(const std::string &tableName, ChangedData &changedData, + const std::vector &records); +#endif + // data std::shared_ptr storageEngine_ = nullptr; std::function onSchemaChanged_; diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 82aff8809af06c91c0b2820941bf0996760b30e2..04f102e8939d245c99fdbc44f2b0980fd8d97e5a 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -723,5 +723,138 @@ int RelationalSyncAbleStorage::ResetGenLogTaskStatus() storageEngine_->ResetGenLogTaskStatus(); return E_OK; } + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int RelationalSyncAbleStorage::AgingCloudNoneExistRecord(const std::string &tableName) +{ + if (storageEngine_ == nullptr) { + LOGE("[AgingCloudNoneExistRecord] Storage is null"); + return -E_INVALID_DB; + } + TableInfo tableInfo = storageEngine_->GetSchema().GetTable(tableName); + bool isUseRowid = tableInfo.IsNoPkTable() || tableInfo.IsMultiPkTable(); + std::string dataPk = isUseRowid ? "_rowid_" : tableInfo.GetPrimaryKey().at(0); + + int errCode = E_OK; + uint16_t loopTime = 0; + do { + loopTime++; + std::vector records; + ChangedData changedData; + changedData.type = ChangedDataType::DATA; + changedData.tableName = tableName; + changedData.field.push_back(dataPk); + errCode = GetOneBatchCloudNoneExistRecord(tableName, dataPk, records); + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] get one batch cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + errCode = DeleteOneBatchCloudNoneExistRecord(tableName, changedData, records); + if (errCode == -E_FINISHED) { + break; + } + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] delete one batch cloud none exist record failed.%d, tableName:%s", + errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + TriggerObserverAction("CLOUD", std::move(changedData), true); + if (loopTime >= UINT16_MAX) { + LOGW("[AgingCloudNoneExistRecord] there is too much data that not exist in the cloud, about to exit"); + break; + } + std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); + } while (true); + return E_OK; +} + +int RelationalSyncAbleStorage::GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records) +{ + int errCode = E_OK; + auto *readHandle = static_cast( + storageEngine_->FindExecutor(false, OperatePerm::NORMAL_PERM, errCode)); + if (readHandle == nullptr) { + return errCode; + } + errCode = readHandle->StartTransaction(TransactType::DEFERRED); + if (errCode != E_OK) { + ReleaseHandle(readHandle); + return errCode; + } + sqlite3 *db = nullptr; + errCode = readHandle->GetDbHandle(db); + if (errCode != E_OK) { + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + errCode = SQLiteRelationalUtils::GetOneBatchCloudNotExistRecord(tableName, db, records, dataPk); + if (errCode != E_OK) { + LOGE("[GetOneBatchCloudNoneExistRecord] get cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + readHandle->Commit(); + ReleaseHandle(readHandle); + return errCode; +} + +int RelationalSyncAbleStorage::DeleteOneBatchCloudNoneExistRecord(const std::string &tableName, + ChangedData &changedData, const std::vector &records) +{ + auto start = std::chrono::steady_clock::now(); + int errCode = E_OK; + auto *writeHandle = static_cast( + storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode)); + if (writeHandle == nullptr) { + return errCode; + } + errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + ReleaseHandle(writeHandle); + return errCode; + } + ResFinalizer finalizer([this, errCode, writeHandle] { + auto handle = writeHandle; + (errCode != E_OK && errCode != -E_FINISHED) ? handle->Rollback() : handle->Commit(); + ReleaseHandle(handle); + }); + sqlite3 *db = nullptr; + errCode = writeHandle->GetDbHandle(db); + if (errCode != E_OK) { + return errCode; + } + if (records.empty()) { + errCode = SQLiteRelationalUtils::DropTempTable(tableName, db); + if (errCode != E_OK) { + LOGE("[DeleteOneBatchCloudNoneExistRecord] drop temp table failed:%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + } + return errCode == E_OK ? -E_FINISHED : errCode; + } + std::vector dataVec; + for (auto &record : records) { + errCode = SQLiteRelationalUtils::DeleteOneRecord(tableName, db, record, logicDelete_); + if (errCode != E_OK) { + LOGE("[DeleteOneBatchCloudNoneExistRecord] delete cloud not exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + dataVec.emplace_back(record.pkValue); + auto duration = std::chrono::steady_clock::now() - start; + if (duration > CloudDbConstant::LONG_TIME_TRANSACTION) { + LOGI("[DeleteOneBatchCloudNoneExistRecord] delete one batch cloud not exist record cost %" PRId64 "ms.", + duration.count()); + break; + } + } + changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec); + return E_OK; +} +#endif } #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index 0149f26f08c3a2ec3e6e1eecb0864d12e8099049..daf974671983db7a105e80b5a310ea36d6ebc2f6 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1256,5 +1256,74 @@ void SQLiteRelationalUtils::FillSyncInfo(const CloudSyncOption &option, const Sy info.prepareTraceId = option.prepareTraceId; info.asyncDownloadAssets = option.asyncDownloadAssets; } + +int SQLiteRelationalUtils::GetOneBatchCloudNotExistRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk) +{ + std::string sql = "SELECT a._rowid_, a.data_key, b." + dataPk + ", a.flag FROM " + + DBCommon::GetLogTableName(tableName) + " AS a LEFT JOIN " + tableName + " AS b ON (a.data_key = b._rowid_) " + "WHERE a.cloud_gid IS NOT '' AND a.cloud_gid NOT IN (SELECT cloud_gid FROM naturalbase_rdb_aux_" + tableName + + "_log_tmp) limit 100;"; + sqlite3_stmt *stmt = nullptr; + int errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchCloudNotExistRecord] Get statement failed, %d.", errCode); + return errCode; + } + + do { + errCode = SQLiteUtils::StepWithRetry(stmt); + if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + CloudNotExistRecord record; + record.logRowid = sqlite3_column_int64(stmt, 0); // col 0 is _rowid_ of log table + record.dataRowid = sqlite3_column_int64(stmt, 1); // col 1 is _rowid_ of data table + errCode = GetTypeValByStatement(stmt, 2, record.pkValue); // col 2 is pk of data table + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchCloudNotExistRecord] Get pk value failed, errCode = %d.", errCode); + break; + } + record.flag = sqlite3_column_int(stmt, 3); // col 3 is flag + records.push_back(record); + } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { + errCode = E_OK; + break; + } else { + LOGE("[RDBUtils][GetOneBatchCloudNotExistRecord] Step failed, errCode = %d.", errCode); + break; + } + } while (true); + + return SQLiteUtils::ProcessStatementErrCode(stmt, true, errCode); +} + +int SQLiteRelationalUtils::DeleteOneRecord(const std::string &tableName, sqlite3 *db, const CloudNotExistRecord &record, + bool isLogicDelete) +{ + if ((record.flag & static_cast(LogInfoFlag::FLAG_LOCAL)) != 0) { + std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + + " SET cloud_gid = '' AND version = '' where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + + if ((record.flag & static_cast(LogInfoFlag::FLAG_CLOUD_WRITE)) != 0) { + std::string sql = "DELETE FROM " + tableName + " WHERE _rowid_ = " + std::to_string(record.dataRowid) + ";"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + return errCode; + } + int32_t newFlag = isLogicDelete ? ((record.flag & (0x20 | 0x1 | 0x8)) & (~0x4000)) : + ((record.flag & (0x20 | 0x1)) & (~0x4000)); + sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET cloud_gid = '' AND version = '' AND flag = " + + std::to_string(newFlag) + " where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + return E_OK; +} + +int SQLiteRelationalUtils::DropTempTable(const std::string &tableName, sqlite3 *db) +{ + std::string sql = "DROP TABLE IF EXISTS " + DBCommon::GetLogTableName(tableName) + "_temp"; + return SQLiteUtils::ExecuteRawSQL(db, sql); +} #endif } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index 60f61115ead3310d084ae7f6f12320c4e963f5bf..bf395b18d25e625f1629dcedd6c8d45b96e787d1 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -150,6 +150,21 @@ public: #ifdef USE_DISTRIBUTEDDB_CLOUD static void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, ICloudSyncer::CloudTaskInfo &info); + + struct CloudNotExistRecord { + int64_t logRowid = 0; + int64_t dataRowid = 0; + int32_t flag = 0; + Type pkValue; + }; + + static int GetOneBatchCloudNotExistRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk); + + static int DeleteOneRecord(const std::string &tableName, sqlite3 *db, const CloudNotExistRecord &record, + bool isLogicDelete); + + static int DropTempTable(const std::string &tableName, sqlite3 *db); #endif private: static int BindExtendStatementByType(sqlite3_stmt *statement, int cid, Type &typeVal);