我有3个级联管道(一个连接到另外两个),描述如下,

> LHSPipe – (较大尺寸)

> RHSPipes – (可能适合内存的较小尺寸)

Psuedocode如下,此示例涉及两个连接

如果F1DecidingFactor = YES则
使用RHS查找#1 BY(LHSPipe.F1Input = RHS查找#1.Join#F1)加入LHSPipe并设置查找结果(SET LHSPipe.F1Output = Result#F1)
除此以外
SET LHSPipe.F1Output = N / A

相同的逻辑适用于F2计算.

预期产量,

IF-ELSE决定是否加入,这种情况迫使我进入自定义加入操作.

考虑到上述情况,我想去MAP-SIDE加入(保持RHSPipe在MAP任务节点的内存中),我正在考虑以下可能的解决方案,每个都有其优缺点.需要你对这些建议.

选项1:

CoGroup – 我们可以使用带有BufferJoiner的CoGroup和自定义连接(操作)来构建自定义连接逻辑,但是不能确保MAP-SIDE连接.

选项2:

HashJoin – 它确保MAP-SIDE加入,但就我所见,无法使用此方式构建自定义连接.

请更正我的理解,并提出您的意见以处理此要求.

提前致谢.

解决方法

解决这个问题的最好方法(我可以想到)是修改你的较小的数据集.您可以向较小的数据集添加新的字段(F1DecidingFactor). F1Result的价值可以如下:

Sudo代码

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

结果表

|F1#Join|F1#Result|F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

您也可以通过级联进行上述操作.

之后,您可以进行地图侧的加入.

如果修改较小的数据集是不可能的,那么我有2个选项来解决问题.

选项1

将新的字段添加到您的小管道,这相当于您决定的因素(即F1DecidingFactor_RHS =是).然后将其包含在您的加入条件中.一旦你的加入完成,你将只有这些条件匹配的那些行的值.否则将为空/空白.示例代码:

主班

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

        Fields functionFields = new Fields("F1DecidingFactor","F1Output","F2DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // New field to small pipe. Expected Fields:
        // F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne,new Insert(f1DecidingFactorRhs,"Yes"),Fields.ALL);

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // New field to small pipe. Expected Fields:
        // F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo,Fields.ALL);

        // Joining first small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe,lhsJoinerOne,rhsOne,rhsJoinerOne,new LeftJoin());

        // Joining second small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne,lhsJoinerTwo,rhsTwo,rhsJoinerTwo,new LeftJoin());

        Pipe result = new Each(resultsTwo,functionFields,new TestFunction(),Fields.REPLACE);

        result = new discard(result,f1DecidingFactorRhs);
        result = new discard(result,f2DecidingFactorRhs);

        // result Pipe should have expected result
    }
}

选项2

如果要使用默认值而不是null / blank,那么建议先使用默认的Joiners进行HashJoin,然后使用一个函数来更新具有适当值的元组.就像是:

主班

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJointest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields functionFields = new Fields("F1DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // Joining first small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe,f1Input,f1Join,new LeftJoin());

        // Joining second small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne,f2Input,f2Join,Fields.REPLACE);

        // result Pipe should have expected result
    }
}

更新功能

import cascading.flow.FlowProcess;
import cascading.operation.BaSEOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaSEOperation<Void> implements Function<Void> {

    private static final long serialVersionUID = 1L;

    private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

    // Expected Fields: "F1DecidingFactor","F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

    @Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process,FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

        TupleEntry result = new TupleEntry(arguments);

        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output",DEFAULT_VALUE);
        }

        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output",DEFAULT_VALUE);
        }

        call.getoutputCollector().add(result);
    }

}

参考

> Insert Function
> Custom Function
> HashJoin

这应该可以解决你的问题.让我知道这是否有帮助.

java – 在级联中构建自定义连接逻辑,仅确保MAP_SIDE的更多相关文章

  1. Django 报错:Broken pipe from ('127.0.0.1', 58924)的解决

    这篇文章主要介绍了Django 报错:Broken pipe from ('127.0.0.1', 58924)的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  2. Node.js pipe实现源码解析

    这篇文章主要介绍了Node.js pipe实现源码解析,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  3. Angular2的管道Pipe的使用方法

    本篇文章主要介绍了Angular 2的管道Pipe的使用方法,详细的介绍了管道的定义和使用方法,具有一定的参考价值,有兴趣的可以了解一下

  4. 详解JS中的compose函数和pipe函数用法

    这篇文章主要介绍了JS中的compose函数和pipe函数用法,想深入了解Javascript的同学,可以参考下

  5. Angular管道PIPE的介绍与使用方法

    这篇文章主要给大家介绍了关于Angular管道PIPE的相关资料,管道的作用就是传输,并且不同的管道具有不同的作用,需要的朋友可以参考下

  6. 理解nodejs的stream和pipe机制的原理和实现

    本篇文章主要介绍了理解nodejs的stream和pipe机制的原理和实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  7. 过滤嵌套数组值(删除不匹配的值)管道

    我有一个这样的审讯:在一个有角度的管道中,我试图返回嵌套名称与字符串匹配的组。所以在这种情况下,如果我搜索了2,那么应该只返回组b,并且只返回第一个嵌套对象。我的匹配项正在处理无嵌套代码:items?.filter(item=>searchText.split('').every(q=>newRegExp(q,'i').test(item[field]))searchText是我要搜索的单词,field是字段的名称(在本例中为名称)。我以为这样的方法会奏效:但是当它返回正确的组时,实际的嵌套项不会被移除。

  8. 如何构建PHP队列系统

    通过将其添加到服务器启动时启动的事物列表中.不幸的是,这样做的说明因操作系统和操作系统版本而异.您可能希望使用更多跨平台的东西.我有很多运气与supervisor,你可以在你选择的操作系统的包回购中找到.那就是说,你正走在疯狂的路上.你正在做的事情之前已经完成了,更好的是,由令人敬畏的人.查看Gearman工作队列系统和随附的PECLextension.发生这样的情况,主管也非常方便让您的Gearman工作人员保持活力.

  9. 在Windows上具有常量输出的Python无块子进程输入

    我正在尝试使用subproccess和_thread模块运行命令.子进程有一个输出流.为了解决这个问题,我使用了两个线程,一个不断打印新线,另一个是检查输入.当我通过proc.stdin.write(‘Somestring’)传递子进程输入时,它返回1然后我没有输出.沟通不能像我读过的大多数其他问题一样工作,因为它会阻止等待EOF,尽管它确实打印了将要返回的任何内容的第一行.我看到了一些使用’pt

  10. Gnuplot,来自windows.命令窗口打开和关闭

    我有以下,无论我尝试什么命令窗口再次打开和关闭.没有显示图表,也没有写入文件.任何有c使用gnuplot的解决方案的人.我有4.4和4.6rc1可用.以下程序已在Windows上使用VisualStudio和MinGW编译器以及使用gcc的GNU/Linux进行了测试.gnuplot二进制文件必须位于路径上,而在Windows上,必须使用二进制文件的管道pgnuplot版本.我发现Windows管

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部