我有一个GCS存储桶,其中包含1000个文件夹,在这些文件夹中有嵌套的文件夹
包含数百万个json文件的文件夹结构类似于{bucket_name}/{dir1}/}dir2}/{test.json}每个json文件只包含一个要处理的记录。目前我的管道是这样的。
PCollection<String> records = p.apply("ReadFromGCS", TextIO.read().from("gs://test_bucket/**/**/**.json")
.withHintMatchesManyFiles());
PCollection<Document> documents = records.apply("process", ParDo.of(new DoFn<String, Document>() {
@ProcessElement
public void processElement(@Element String row, OutputReceiver<Document> out) {
Document doc;
try {
Gson gson = new Gson();
ResearchPaper paper = gson.fromJson(row, Test.class);
doc = Document.parse(gson.toJson(paper));
doc.append("timestamp", System.currentTimeMillis());
}
catch (Exception e) {
doc = new Document();
doc.append("failed", "true");
doc.append("timestamp", System.currentTimeMillis());
doc.append("reason", Arrays.toString(e.getStackTrace()));
doc.append("original_json", row);
}
out.output(doc);
}
}));
documents.apply("WriteToMongoDB", MongoDbIO.write()
.withUri("")
.withDatabase("testnew")
.withCollection("test")
.withBatchSize(1000)
);
有没有其他有效的方法,通过在DataFlow的第一步中增加风险来加快处理速度?
我正在检查管线是否可以进一步优化。