# flutter-quill **Repository Path**: foreachlife/flutter-quill ## Basic Information - **Project Name**: flutter-quill - **Description**: flutter-quill - **Primary Language**: Dart - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-01-17 - **Last Updated**: 2023-08-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ``` /* * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.destination.obs; import com.fasterxml.jackson.databind.JsonNode; import com.obs.services.ObsClient; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.s3.util.S3NameTransformer; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.function.Consumer; public abstract class BaseObsDestination extends BaseConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(BaseObsDestination.class); protected final ObsDestinationConfigFactory configFactory; private final NamingConventionTransformer nameTransformer; protected BaseObsDestination() { this(new ObsDestinationConfigFactory()); } protected BaseObsDestination(final ObsDestinationConfigFactory configFactory) { this.configFactory = configFactory; this.nameTransformer = new S3NameTransformer(); } @Override public AirbyteConnectionStatus check(JsonNode config) throws Exception { try { ObsDestinationConfig obsDestinationConfig = configFactory.getObsDestinationConfig(config); ObsClient obsClient = obsDestinationConfig.getOBSClient(); ObsBaseChecks.testIAMUserHasListObjectPermission(obsClient, obsDestinationConfig.getBucket()); ObsBaseChecks.testSingleUpload(obsClient, obsDestinationConfig.getBucket(), obsDestinationConfig.getBucketPath()); ObsBaseChecks.testMultipartUpload(obsClient, obsDestinationConfig.getBucket(), obsDestinationConfig.getBucketPath()); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception attempting to access the S3 bucket: ", e); return new AirbyteConnectionStatus() .withStatus(AirbyteConnectionStatus.Status.FAILED) .withMessage("Could not connect to the S3 bucket with the provided configuration. \n" + e .getMessage()); } } @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) throws Exception { final ObsDestinationConfig obsConfig = configFactory.getObsDestinationConfig(config); return null; // return new ObsConsumerFactory().create( // outputRecordCollector, // new S3StorageOperations(nameTransformer, obsConfig.getOBSClient(), obsConfig), // nameTransformer, // SerializedBufferFactory.getCreateFunction(obsConfig, FileBuffer::new), // obsConfig, // catalog); } } ``` ``` package io.airbyte.integrations.destination.obs; import com.obs.services.ObsClient; import com.obs.services.model.InitiateMultipartUploadRequest; import com.obs.services.model.InitiateMultipartUploadResult; import com.obs.services.model.ObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.IOException; public class ObsBaseChecks { private static final Logger LOGGER = LoggerFactory.getLogger(ObsBaseChecks.class); public static void testIAMUserHasListObjectPermission(final ObsClient obsClient, final String bucketName) { LOGGER.info("Started testing if IAM user can call listObjects on the destination bucket"); obsClient.listObjects(bucketName); LOGGER.info("Finished checking for listObjects permission"); } public static void testSingleUpload(final ObsClient obsClient, final String bucketName, final String bucketPath) { LOGGER.info("Started testing if all required credentials assigned to user for single file uploading"); final var prefix = bucketPath.endsWith("/") ? bucketPath : bucketPath + "/"; final String testFile = prefix + "test_" + System.currentTimeMillis(); try { obsClient.putObject(bucketName, "test", new ByteArrayInputStream(testFile.getBytes())); } finally { obsClient.deleteObject(bucketName, testFile); } LOGGER.info("Finished checking for normal upload mode"); } public static void testMultipartUpload(final ObsClient obsClient, final String bucketName, final String bucketPath) throws IOException { LOGGER.info("Started testing if all required credentials assigned to user for multipart upload"); InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, "testMultipart"); ObjectMetadata metadata = new ObjectMetadata(); metadata.addUserMetadata("property", "property-value"); metadata.setContentType("text/plain"); request.setMetadata(metadata); InitiateMultipartUploadResult result = obsClient.initiateMultipartUpload(request); String uploadId = result.getUploadId(); obsClient.deleteObject(bucketName, "testMultipart"); LOGGER.info("Finished verification for multipart upload mode: {}", uploadId); } public static boolean testCustomEndpointSecured(final String endpoint) { // if user does not use a custom endpoint, do not fail if (endpoint == null || endpoint.length() == 0) { return true; } else { return endpoint.startsWith("https://"); } } } ``` ``` package io.airbyte.integrations.destination.obs; public class ObsConstants { public static final String OBS_BUCKET_PATH = "base_path"; public static final String OBS_FILE_NAME_PATTERN = "file_name_pattern"; public static final String OBS_ENDPOINT = "endpoint"; public static final String OBS_ACCESS_KEY = "access_key"; public static final String OBS_SECRET_KEY = "secret_key"; public static final String OBS_BUCKET = "bucket"; public static final String OBS_REGION = "region"; public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"; } ``` ``` package io.airbyte.integrations.destination.obs; import com.google.common.base.Preconditions; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.integrations.destination.record_buffer.BufferCreateFunction; import io.airbyte.integrations.destination.record_buffer.FlushBufferFunction; import io.airbyte.integrations.destination.record_buffer.SerializedBufferingStrategy; import io.airbyte.integrations.destination.s3.BlobStorageOperations; import io.airbyte.integrations.destination.s3.WriteConfig; import io.airbyte.protocol.models.v0.*; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; public class ObsConsumerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ObsConsumerFactory.class); private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC); public AirbyteMessageConsumer create(final Consumer outputRecordCollector, final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, final BufferCreateFunction onCreateBuffer, final ObsDestinationConfig obsConfig, final ConfiguredAirbyteCatalog catalog) { final List writeConfigs = createWriteConfigs(storageOperations, namingResolver, obsConfig, catalog); return new BufferedStreamConsumer( outputRecordCollector, onStartFunction(storageOperations, writeConfigs), new SerializedBufferingStrategy( onCreateBuffer, catalog, flushBufferFunction(storageOperations, writeConfigs, catalog)), onCloseFunction(storageOperations, writeConfigs), catalog, storageOperations::isValidData); } private static List createWriteConfigs(final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, final ObsDestinationConfig obsConfig, final ConfiguredAirbyteCatalog catalog) { return catalog.getStreams() .stream() .map(toWriteConfig(storageOperations, namingResolver, obsConfig)) .collect(Collectors.toList()); } private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteConfig config) { return new AirbyteStreamNameNamespacePair(config.getStreamName(), config.getNamespace()); } private static Function toWriteConfig(final BlobStorageOperations storageOperations, final NamingConventionTransformer namingResolver, final ObsDestinationConfig obsDestinationConfig) { return stream -> { Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); final AirbyteStream abStream = stream.getStream(); final String namespace = abStream.getNamespace(); final String streamName = abStream.getName(); final String bucketPath = obsDestinationConfig.getBucketPath(); final String customOutputFormat = String.join("/", bucketPath, obsDestinationConfig.getPathFormat()); final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat); final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, customOutputFormat, fullOutputPath, syncMode); LOGGER.info("Write config: {}", writeConfig); return writeConfig; }; } private FlushBufferFunction flushBufferFunction(final BlobStorageOperations storageOperations, final List writeConfigs, final ConfiguredAirbyteCatalog catalog) { final Map pairToWriteConfig = writeConfigs.stream() .collect(Collectors.toUnmodifiableMap( ObsConsumerFactory::toNameNamespacePair, Function.identity())); return (pair, writer) -> { LOGGER.info("Flushing buffer for stream {} ({}) to storage", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); if (!pairToWriteConfig.containsKey(pair)) { throw new IllegalArgumentException( String.format("Message contained record from a stream %s that was not in the catalog. \ncatalog: %s", pair, Jsons.serialize(catalog))); } final WriteConfig writeConfig = pairToWriteConfig.get(pair); try (writer) { writer.flush(); writeConfig.addStoredFile(storageOperations.uploadRecordsToBucket( writer, writeConfig.getNamespace(), writeConfig.getStreamName(), writeConfig.getFullOutputPath())); } catch (final Exception e) { LOGGER.error("Failed to flush and upload buffer to storage:", e); throw new RuntimeException("Failed to upload buffer to storage", e); } }; } private OnStartFunction onStartFunction(final BlobStorageOperations storageOperations, final List writeConfigs) { return () -> { LOGGER.info("Preparing bucket in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { if (writeConfig.getSyncMode().equals(DestinationSyncMode.OVERWRITE)) { final String namespace = writeConfig.getNamespace(); final String stream = writeConfig.getStreamName(); final String outputBucketPath = writeConfig.getOutputBucketPath(); final String pathFormat = writeConfig.getPathFormat(); LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {} pathFormat {}", namespace, stream, outputBucketPath, pathFormat); storageOperations.cleanUpBucketObject(namespace, stream, outputBucketPath, pathFormat); LOGGER.info("Clearing storage area in destination completed for namespace {} stream {} bucketObject {}", namespace, stream, outputBucketPath); } } LOGGER.info("Preparing storage area in destination completed."); }; } private OnCloseFunction onCloseFunction(final BlobStorageOperations storageOperations, final List writeConfigs) { return (hasFailed) -> { if (hasFailed) { LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { storageOperations.cleanUpBucketObject(writeConfig.getFullOutputPath(), writeConfig.getStoredFiles()); writeConfig.clearStoredFiles(); } LOGGER.info("Cleaning up destination completed."); } }; } } ``` ``` package io.airbyte.integrations.destination.obs; import com.google.common.annotations.VisibleForTesting; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.adaptive.AdaptiveDestinationRunner; public class ObsDestination extends BaseObsDestination { public ObsDestination() {} @VisibleForTesting public ObsDestination(final ObsDestinationConfigFactory s3DestinationConfigFactory) { super(s3DestinationConfigFactory); } public static void main(final String[] args) throws Exception { new IntegrationRunner(new ObsDestination()).run(args); AdaptiveDestinationRunner.baseOnEnv() .withOssDestination(ObsDestination::new) .withCloudDestination(io.airbyte.integrations.destination.obs.ObsDestinationStrictEncrypt::new) .run(args); } } ``` ``` package io.airbyte.integrations.destination.obs; import com.fasterxml.jackson.databind.JsonNode; import com.obs.services.ObsClient; import com.obs.services.internal.security.BasicSecurityKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import static io.airbyte.integrations.destination.obs.ObsConstants.*; import static io.airbyte.integrations.destination.obs.ObsConstants.OBS_SECRET_KEY; public class ObsDestinationConfig { private static final Logger LOGGER = LoggerFactory.getLogger(ObsDestinationConfigFactory.class); private final String accessKey; private final String secretKey; private final String bucketPath; private final String bucket; private final String pathFormat; private final String endpoint; private final BasicSecurityKey basicSecurityKey; private final Object lock = new Object(); private ObsClient obsClient; public String getBucketPath() { return bucketPath; } public String getPathFormat() { return pathFormat; } public String getBucket() { return bucket; } public String getEndpoint() { return endpoint; } public ObsDestinationConfig(String accessKey, String secretKey, String bucketPath, String bucket, String pathFormat, String endpoint, BasicSecurityKey basicSecurityKey) { this.accessKey = accessKey; this.secretKey = secretKey; this.bucketPath = bucketPath; this.bucket = bucket; this.pathFormat = pathFormat; this.endpoint = endpoint; this.basicSecurityKey = basicSecurityKey; } @Nullable private static String getProperty(@Nonnull final JsonNode config, @Nonnull final String key) { final JsonNode node = config.get(key); if (node == null) { return null; } return node.asText(); } public static Builder create(final String bucketName, final String bucketPath, final String endpoint) { return new Builder(bucketName, bucketPath, endpoint); } public static ObsDestinationConfig getObsDestinationConfig(@Nonnull final JsonNode config) { Builder builder = create( getProperty(config, OBS_BUCKET), "", getProperty(config, OBS_ENDPOINT)); if (config.has(OBS_BUCKET_PATH)) { builder = builder.withBucketPath(config.get(OBS_BUCKET_PATH).asText()); } if (config.has(OBS_FILE_NAME_PATTERN)) { builder = builder.withFileNamePattern(config.get(OBS_FILE_NAME_PATTERN).asText()); } BasicSecurityKey basicSecurityKey = null; if (config.has(OBS_ACCESS_KEY) && config.has(OBS_SECRET_KEY)) { basicSecurityKey = new BasicSecurityKey(getProperty(config, OBS_ACCESS_KEY), getProperty(config, OBS_SECRET_KEY)); } builder = builder.withCredentialConfig(basicSecurityKey); return builder.get(); } public static class Builder { private String pathFormat = ObsConstants.OBS_FILE_NAME_PATTERN; private String bucket; private String bucketPath; private String endpoint; private BasicSecurityKey basicSecurityKey; private ObsClient obsClient; private String fileNamePattern; protected Builder(final String bucketName, final String bucketPath, final String endpoint) { this.bucket = bucketName; this.bucketPath = bucketPath; this.endpoint = endpoint; } public Builder withBucket(final String bucket) { this.bucket = bucket; return this; } public Builder withFileNamePattern(final String fileNamePattern) { this.fileNamePattern = fileNamePattern; return this; } public Builder withBucketPath(final String bucketPath) { this.bucketPath = bucketPath; return this; } public Builder withEndPoint(final String endpoint) { this.endpoint = endpoint; return this; } public Builder withPathFormat(final String pathFormat) { this.pathFormat = pathFormat; return this; } public Builder withCredentialConfig(final BasicSecurityKey basicSecurityKey) { this.basicSecurityKey = basicSecurityKey; return this; } public Builder withObsClient(final ObsClient obsClient) { this.obsClient = obsClient; return this; } public ObsDestinationConfig get() { return new ObsDestinationConfig(basicSecurityKey.getAccessKey(), basicSecurityKey.getSecretKey(), bucketPath, bucket, pathFormat, endpoint, basicSecurityKey); } } public ObsClient getOBSClient() { synchronized (lock) { if (obsClient == null) { return resetObsClient(); } return obsClient; } } ObsClient resetObsClient() { synchronized (lock) { if (obsClient != null) { try { obsClient.close(); } catch (IOException e) { e.printStackTrace(); } } } obsClient = createObsClient(); return obsClient; } protected ObsClient createObsClient() { LOGGER.info("Creating OBS client..."); return new ObsClient(basicSecurityKey.getAccessKey(), basicSecurityKey.getSecretKey(), getEndpoint()); } } ``` ``` package io.airbyte.integrations.destination.obs; import com.fasterxml.jackson.databind.JsonNode; public class ObsDestinationConfigFactory { public ObsDestinationConfig getObsDestinationConfig(final JsonNode config) { return ObsDestinationConfig.getObsDestinationConfig(config); } } ``` ``` /* * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.destination.obs; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; public class ObsDestinationStrictEncrypt extends BaseObsDestination { public ObsDestinationStrictEncrypt() { super(); } @VisibleForTesting public ObsDestinationStrictEncrypt(final ObsDestinationConfigFactory configFactory) { super(configFactory); } @Override public AirbyteConnectionStatus check(final JsonNode config) throws Exception { final ObsDestinationConfig destinationConfig = this.configFactory.getObsDestinationConfig(config); // Fails early to avoid extraneous validations checks if custom endpoint is not secure if (!ObsBaseChecks.testCustomEndpointSecured(destinationConfig.getEndpoint())) { return new AirbyteConnectionStatus() .withStatus(Status.FAILED) .withMessage("Custom endpoint does not use HTTPS"); } return super.check(config); } } ```