1.前言
第一篇文章写完之后,我还是对这个 Integration 有些不解,因为网上的资料太少了,翻来覆去的都只是那几张说明。我有一个需求,就是根据不同的 消息去过滤消息,但是虽然 Filter 重载的方法很多,但是具体不知道他的参数到底是什么。
参考文章:
【1】.Spring企业集成流 Spring Integration
2.Filter
这个 Filter 的作用就是过滤是否处理消息,如果返回 true 则继续的处理消息,否则就会丢弃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| IntegrationFlow .from(ChannelName.INBOUND_OSD) .filter(Message.class,source->{ try { TopicOsdRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(),new TypeReference<TopicOsdRequest>() {}); log.info("{}",response.getData()); }catch (Exception e){ log.error(e.getMessage()); } return true; }) .transform(Message.class, source -> { try { TopicOsdRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(), new TypeReference<TopicOsdRequest>() { }); String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); return response.setFrom(topic.substring((THING_MODEL_PRE + PRODUCT).length(), topic.indexOf(OSD_SUF))); } catch (IOException e) { throw new CloudSDKException(e); } }, null) .<TopicOsdRequest>handle((response, headers) -> { String gateway_sn=response.getGateway(); GatewayManager gateway=droneCommon.registerDevice(gateway_sn); if(gateway==null){ gateway=gateway= SDKManager.getDeviceSDK(gateway_sn); } OsdDeviceTypeEnum typeEnum = OsdDeviceTypeEnum.find(gateway.getType(), response.getFrom().equals(response.getGateway())); Map<String, Object> data = (Map<String, Object>) response.getData(); if (!typeEnum.isGateway()) { List payloadData = (List) data.getOrDefault(PayloadModelConst.PAYLOAD_KEY, new ArrayList<>()); PayloadModelConst.getAllIndexWithPosition().stream().filter(data::containsKey) .map(data::get).forEach(payloadData::add); data.put(PayloadModelConst.PAYLOAD_KEY, payloadData); } return response.setData(Common.getObjectMapper().convertValue(data, typeEnum.getClassType())); }) .<TopicOsdRequest, OsdDeviceTypeEnum>route(response -> OsdDeviceTypeEnum.find(response.getData().getClass()), mapping -> Arrays.stream(OsdDeviceTypeEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName()))) .get();
|
参考文章:
【1】.Filter 过滤器置于集成管道之间,它能够根据断言允许或拒绝消息进入流程的下一步.例如,假设消息包含整型的值,它们要通过名为numberChannel的通道进行发布,但是我们只想让偶数进入名为evenNumberChannel的通道。在这种情况下,我们可以使用@Filter注解定义一个过滤器: