我有一个GCS存储桶,其中包含1000个文件夹,在这些文件夹中有嵌套的文件夹
包含数百万个json文件的文件夹结构类似于{bucket_name}/{dir1}/}dir2}/{test.json}每个json文件只包含一个要处理的记录。目前我的管道是这样的。
PCollection<String> records =  p.apply("ReadFromGCS", TextIO.read().from("gs://test_bucket/**/**/**.json")
                .withHintMatchesManyFiles());
PCollection<Document> documents = records.apply("process", ParDo.of(new DoFn<String, Document>() {
            @ProcessElement
            public void processElement(@Element String row, OutputReceiver<Document> out) {
                Document doc;
               try {
                   Gson gson = new Gson();
                   ResearchPaper paper = gson.fromJson(row, Test.class);
                    doc = Document.parse(gson.toJson(paper));
                    doc.append("timestamp", System.currentTimeMillis());
               }
               catch (Exception e) {
                   doc = new Document();
                   doc.append("failed", "true");
                     doc.append("timestamp", System.currentTimeMillis());
                     doc.append("reason", Arrays.toString(e.getStackTrace()));
                     doc.append("original_json", row);
               }
                out.output(doc);
            }
        }));
documents.apply("WriteToMongoDB", MongoDbIO.write()
                .withUri("")
                .withDatabase("testnew")
                .withCollection("test")
                .withBatchSize(1000)
        );
有没有其他有效的方法,通过在DataFlow的第一步中增加风险来加快处理速度?
我正在检查管线是否可以进一步优化。