diff --git a/server/src/main/java/datart/server/job/SchemaSyncJob.java b/server/src/main/java/datart/server/job/SchemaSyncJob.java index 93bcf6fde2417a45d2558af919aa3e380399b1c3..34e51706caf358959381fad74b90cbda896b1e1e 100644 --- a/server/src/main/java/datart/server/job/SchemaSyncJob.java +++ b/server/src/main/java/datart/server/job/SchemaSyncJob.java @@ -100,28 +100,30 @@ public class SchemaSyncJob implements Job, Closeable { } private boolean upsertSchemaInfo(String sourceId, List schemaItems) { - TransactionStatus transaction = TransactionHelper.getTransaction(TransactionDefinition.PROPAGATION_REQUIRES_NEW, TransactionDefinition.ISOLATION_REPEATABLE_READ); - try { - SourceSchemasMapperExt mapper = Application.getBean(SourceSchemasMapperExt.class); - SourceSchemas sourceSchemas = mapper.selectBySource(sourceId); - if (sourceSchemas == null) { - sourceSchemas = new SourceSchemas(); - sourceSchemas.setId(UUIDGenerator.generate()); - sourceSchemas.setSourceId(sourceId); - sourceSchemas.setUpdateTime(new Date()); - sourceSchemas.setSchemas(OBJECT_MAPPER.writeValueAsString(schemaItems)); - mapper.insert(sourceSchemas); - } else { - sourceSchemas.setUpdateTime(new Date()); - sourceSchemas.setSchemas(OBJECT_MAPPER.writeValueAsString(schemaItems)); - mapper.updateByPrimaryKey(sourceSchemas); + synchronized (sourceId.intern()) { + TransactionStatus transaction = TransactionHelper.getTransaction(TransactionDefinition.PROPAGATION_REQUIRES_NEW, TransactionDefinition.ISOLATION_REPEATABLE_READ); + try { + SourceSchemasMapperExt mapper = Application.getBean(SourceSchemasMapperExt.class); + SourceSchemas sourceSchemas = mapper.selectBySource(sourceId); + if (sourceSchemas == null) { + sourceSchemas = new SourceSchemas(); + sourceSchemas.setId(UUIDGenerator.generate()); + sourceSchemas.setSourceId(sourceId); + sourceSchemas.setUpdateTime(new Date()); + sourceSchemas.setSchemas(OBJECT_MAPPER.writeValueAsString(schemaItems)); + mapper.insert(sourceSchemas); + } else { + sourceSchemas.setUpdateTime(new Date()); + sourceSchemas.setSchemas(OBJECT_MAPPER.writeValueAsString(schemaItems)); + mapper.updateByPrimaryKey(sourceSchemas); + } + TransactionHelper.commit(transaction); + return true; + } catch (Exception e) { + TransactionHelper.rollback(transaction); + log.error("source schema parse error ", e); + return false; } - TransactionHelper.commit(transaction); - return true; - } catch (Exception e) { - TransactionHelper.rollback(transaction); - log.error("source schema parse error ", e); - return false; } }