在Spring Data MongoDB中使用反应式编程模型时,可以执行如下事务:
Mono<DeleteResult> result = template.inTransaction() .execute(action -> action.remove(query(where("id").is("step-1")), Step.class));
但是Spring Data MongoDB也支持“反应性存储库”,例如:
public interface PersonRepository extends ReactiveMongoRepository<Person, String> Flux<Person> findByLocationNear(Point location, Distance distance); }
和
public interface CarRepository extends ReactiveMongoRepository<Car, String> Flux<Car> findByYear(int year); }
我的问题是,假设您拥有,您是否ReactiveMongoRepository可以以某种方式利用MongoDB事务,例如将a Person和插入Car同一事务中(使用PersonRepository和CarRepository大小写)?如果是这样,您该怎么做?
ReactiveMongoRepository
Person
Car
PersonRepository
CarRepository
我也一直在努力寻找Mongo DB和Spring Boot的Reactive风格的 事务 支持解决方案
但是幸运的是我自己想通了。尽管Google提供的一些功能也没有帮助,但这些都是无反应的。
您需要使用 ReactiveMongoTransactionManager 沿 ReactiveMongoDatabaseFactory ,大部分的末尾的详细信息,也分享代码回购为同
为了使mongo数据库支持事务,我们需要确保数据库应以副本方式运行 。
为什么我们需要那个? 因为否则您会得到一些这样的错误:
此客户端连接到的MongoDB集群不支持会话
相同的说明如下:
version: "3" services: mongo: hostname: mongo container_name: localmongo_docker image: mongo expose: - 27017 ports: - 27017:27017 restart: always entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ] volumes: - ./mongodata:/data/db # need to create a docker volume named as mongodata first
docker exec -it localmongo_docker mongo
rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : "mongo:27017" } ] } )
重要 -代码仓库可以在我的github上找到-https: //github.com/krnbr/mongo-spring-boot- template
该代码的重要说明如下:-
@Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); }
如果链接对某人无效,请共享代码:-
配置和Bean内部
@Configuration public class MongoConfiguration extends AbstractMongoClientConfiguration { @Autowired private MongoProperties mongoProperties; @Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); } @Override protected String getDatabaseName() { return mongoProperties.getDatabase(); } @Override public MongoClient mongoClient() { return MongoClients.create(mongoProperties.getUri()); } }
application.properties (与mongo db有关)
spring.data.mongodb.database=mongo spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0
文件类别
角色类别
@Getter @Setter @Accessors(chain = true) @Document(collection = "roles") @TypeAlias("role") public class Role implements Persistable<String> { @Id private String id; @Field("role_name") @Indexed(unique = true) private String role; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
用户类别
@Getter @Setter @Accessors(chain = true) @Document(collection = "users") @JsonInclude(JsonInclude.Include.NON_NULL) @TypeAlias("user") public class User implements Persistable<String> { @Id() private String id; @Field("username") @Indexed(unique = true) @JsonProperty("username") private String userName; @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private String password; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @DBRef(lazy = true) @JsonProperty("roles") private List<Role> roles = new ArrayList(); @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
UserProfile类别
@Getter @Setter @Accessors(chain = true) @Document(collection = "user_profiles") @JsonInclude(JsonInclude.Include.NON_NULL) @TypeAlias("user_profile") public class UserProfile implements Persistable<String> { @Id private String id; @Indexed(unique = true) private String mobile; @Indexed(unique = true) private String email; private String address; private String firstName; private String lastName; @DBRef private User user; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
ReactiveMongoRepository接口
角色库
public interface RoleRepository extends ReactiveMongoRepository<Role, String> { Mono<Role> findByRole(String role); Flux<Role> findAllByRoleIn(List<String> roles); }
用户资料库
public interface UserRepository extends ReactiveMongoRepository<User, String> { Mono<User> findByUserName(String userName); }
UserProfileRepository
public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> { }
用户服务类 需要在这里创建自己的RuntimeException类,这里是AppRuntimeException类,我一直在使用
@Slf4j @Service public class UserService { @Autowired private RoleRepository roleRepository; @Autowired private UserRepository userRepository; @Autowired private UserProfileRepository userProfileRepository; @Transactional public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) { Mono<Role> roleMono = roleRepository.findByRole("USER"); Mono<User> userMono = roleMono.flatMap(r -> { User user = new User() .setUserName(userRequest.getUsername()) .setPassword(userRequest.getPassword()); user.setRoles(Arrays.asList(r)); return userRepository.save(user); }).onErrorResume(ex -> { log.error(ex.getMessage()); if(ex instanceof DuplicateKeyException) { String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists"; log.error(errorMessage); return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex)); } return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex)); }); Mono<UserProfile> userProfileMono = userMono.flatMap(u -> { UserProfile userProfile = new UserProfile() .setAddress(userRequest.getAddress()) .setEmail(userRequest.getEmail()) .setMobile(userRequest.getMobile()) .setUser(u); return userProfileRepository.save(userProfile); }).onErrorResume(ex -> { log.error(ex.getMessage()); if(ex instanceof DuplicateKeyException) { String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists"; log.error(errorMessage); return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex)); } return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex)); }); return userProfileMono; } }
控制器和模型类
UserRequest 模型类别
@Getter @Setter @Accessors(chain = true) @Slf4j @JsonInclude(JsonInclude.Include.NON_NULL) public class UserRequest { private String username; private String password; private String mobile; private String email; private String address; private String firstName; private String lastName; }
UserProfileApisController 类
@Slf4j @RestController @RequestMapping("/apis/user/profile") public class UserProfileApisController { @Autowired private UserService userService; @PostMapping public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) { return userService.saveUserAndItsProfile(userRequest); } }