# flink-spring **Repository Path**: OSS-STUDIO/flink-spring ## Basic Information - **Project Name**: flink-spring - **Description**: https://github.com/getindata/flink-spring 镜像 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-11-20 - **Last Updated**: 2023-11-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-spring With this library you can build Flink jobs using Spring dependency injection framework. Flink jobs can be build and set up using well known Spring mechanisms for dependency injection making the implementation more clean, efficient and portable. The goal of this library is **NOT** to run entire Flink job within Spring context. Instead, we provide you with a helper classes that can be used in your Flink job to create Spring context based on your Spring configuration classes and use this context to set up your pipeline. This can be done both on Job Manager while processing `main` method (job submission phase) and also on Task Managers, for example in `RichFunction::open` method. The created Spring context is short-lived. This library provides all core Spring dependencies like: - spring-context - spring-beans - spring-core - spring-expression - spring-aop It is based on Spring version **5.3.27,** and it is compiled using Java **11**. # How it works The `flink-spring` library apart from providing Spring dependencies also provides a utility/registry `ContextRegistry` class. This class has an API that allows you to load Spring context. API Usage: ```java DataStreamJob dataSteamJob = new DataStreamJob(); dataSteamJob = new ContextRegistry().autowiredBean(dataSteamJob, "org.example.config"); ``` By calling `new ContextRegistry().autowiredBean(new DataStreamJob(), "org.example.config")` two things have happened: 1. The Spring context was created based on Spring configuration classes from `org.example.config` package. 2. All fields marked as `@Autowired` in `DataStreamJob` instance were injected by Spring. Additionally, the created Spring context was added to `ContextRegistry` instance scope registry. thanks to this we can avoid recreating the context for every `.autowiredBean(...)` call. # Usage in your code 1. Clone the repository and build it using ```shell mvn clean install ``` 2. Copy created artifact `target/flink-spring-0.1.0-SNAPSHOT-jar-with-dependencies.jar` to `lib` folder of your Flink's distribution. Restart the cluster. 3. In your Flink job `pom.xml` add: ```xml com.getindata flink-spring 0.1.0-SNAPSHOT provided ``` Mind that the `scope` is set to `provided`. We don't want to include `flink-spring` lib in our job's uber jar. # Example In this example: - sink is injected by Spring - Source is not injected by Spring (it could) but instead we are passing a `EventProducer` object to its constructor. What we want to show here is that both, Flink components (Sources, Sinks etc.) as well as business code (`EventProducer`) can be injected by this library. For now, more detailed example can be found [here](https://github.com/kristoffSC/flink-using-springDI). #### The main class This class will have all its dependencies marked as `@Autowired` injected by Spring based on configuration classes located in `org.example.config` package. ```java package org.example; import com.getindata.fink.spring.context.ContextRegistry; import org.springframework.beans.factory.annotation.Autowired; /* other imports omitted for clarity. */ public class DataStreamJob { // Will be injected by Spring based on Spring context configuration. @Autowired private EventProducer eventProducer; // Will be injected by Spring based on Spring context configuration. @Autowired private SinkFunction sink; public static void main(String[] args) throws Exception { // Using flink-spring library to inject DataStreamJob.class dependencies that are marked as // @Autowired. new ContextRegistry() .autowiredBean(new DataStreamJob(), "org.example.config") .run(args); } private void run(String[] args) throws Exception { StreamExecutionEnvironment env = createStreamEnv(); env.addSource(new CheckpointCountingSource<>(5, 5, eventProducer)) .setParallelism(1) .process(new FlinkBusinessLogic()) .setParallelism(2) .addSink(sink) // sink will be injected by Spring .setParallelism(2); env.execute("Flink Job Powered By Spring DI."); } private static StreamExecutionEnvironment createStreamEnv() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE); return env; } } ``` #### Configuration classes This is a Spring configuration class that can be used for loading Spring context by `flink-spring` library. ```java package org.example.config; // Spring libraries comes from flink-spring library import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* other imports omitted for clarity. */ @Configuration public class JobSpringConfig { @Bean public EventToStringConverter converter() { return event -> String.format("Order Details - %s", event.toString()); } @Bean public SinkFunction sink(EventToStringConverter converter) { return new ConsoleSink<>(converter); } @Bean public EventProducer eventProducer() { return new OrderProducer(); } @Bean public SessionManager sessionManager() { return new SimpleSessionManager(); } @Bean public OrderProcessor orderProcessor(SessionManager sessionManager) { return new BusinessOrderProcessor( List.of(new SideNameAnonymization()), new OrderSessionize(sessionManager) ); } } ```