Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
227 views
in Technique[技术] by (71.8m points)

java - Calling methods in two different ReactiveMongoRepository's in a transaction using Spring Data MongoDB?

When using the reactive programming model with Spring Data MongoDB it's possible to execute transactions like this:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); 

But Spring Data MongoDB also has support for "reactive repositories", for example:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}

and

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}

My question is, given that you have ReactiveMongoRepository's, can you somehow leverage MongoDB transactions and e.g. insert both a Person and Car in the same transaction (using PersonRepository and CarRepository in the case)? If so, how do you do this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I had also been trying hard to find solution for the Transactional support in Reactive style of Mongo DB & Spring Boot

But luckily I figured it myself. Though few of the things from google were also helpful but those were non reactive.

Important Note - For Spring boot 2.2.x it works well, but with spring boot 2.3.x it has some other issues, it has internal re-write & changes all together

  • You need to use ReactiveMongoTransactionManager along with ReactiveMongoDatabaseFactory, most of the details at the end, also sharing the code repo for the same

  • For getting the mongo db to support the Transactions we need to make sure that the DB should be running in replica mode.

    Why we need that? Because you will get some error like this otherwise:-

    Sessions are not supported by the MongoDB cluster to which this client is connected

The instructions for the same are below:-

  1. run the docker-compose based mongo db server using docker-compose.yml as shared below:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first

  1. After the image comes up, execute the command(here localmongo_docker is the name of the container):-
docker exec -it localmongo_docker mongo
  1. Copy and paste the command below and execute that
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
  1. And then exit the execution by entering exit

Important - The code repo can be found here on my github - https://github.com/krnbr/mongo-spring-boot-template

Important notes for the code are as below:-

  • MongoConfiguration class in the config package is the important part to make the transactions working, link to the configuration class is here

  • Main part is the Bean

     @Bean
     ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
         return new ReactiveMongoTransactionManager(dbFactory);
     }
    
  • For checking the working of the code's Transactional requirement you may go through the class UserService in service package here

Code shared in case the links do not work for someone:-

The Configuration and inside the Beans

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}

application.properties (related to mongo db)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0

Document Classes

Role Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

User Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

UserProfile Class

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}

ReactiveMongoRepository Interface(s)

RoleRepository

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}

UserRepository

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}

UserProfileRepository

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}

The User Service Class Need to create your own RuntimeException Class here, here it is AppRuntimeException Class, I had been using

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}

Controller and the Model Class

UserRequest Model Class

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}

UserProfileApisController class

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...