Xử lý lỗi cho Apache Beam và BigQuery (Java SDK)


Đặng Ngọc Khang
8 tháng trước
Hữu ích 8 Chia sẻ Viết bình luận 0
Đã xem 3398

Thiết kế đường ống

Giả sử chúng ta có một kịch bản đơn giản: các sự kiện đang phát trực tuyến đến Kafka và chúng ta muốn tiêu thụ các sự kiện trong đường ống của mình, thực hiện một số biến đổi và viết kết quả vào các bảng BigQuery, để cung cấp dữ liệu cho các phân tích. 

Bảng BigQuery có thể được tạo trước khi công việc bắt đầu hoặc chính Beam có thể tạo nó.

Mã sẽ trông khá đơn giản:

EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                    .as(EventsProcessingOptions.class);

Pipeline p = Pipeline.create(options);

PCollection tableRows =
                  // read kafka topic
                   p.apply("kafka-topic-read", kafkaReader)
                    .apply("kafka-values", MapElements.into(TypeDescriptors.strings())
                            .via(record ->record.getKV().getValue()))
                    // convert value to JsonNode
                    .apply("string-to-json", ParseJsons.of(JsonNode.class))
                    // create TableRow
                    .apply("Build-table-row", ParDo.of(new EventsRowFn()))

                    // save table row to BigQuery
                    .apply("BQ-write", BigQueryIO.<TableRowWithEvent>write()
                            .to(tableSpec)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Cái gì còn thiếu?

Trong thế giới thực, lỗi có thể xảy ra và trong hầu hết các tình huống, chúng tôi sẽ được yêu cầu xử lý chúng.

Trong đường ống trên, lỗi có thể xảy ra khi chúng tôi cố phân tích sự kiện từ Kafka thành JsonNode, trong quá trình chuyển đổi và trong giai đoạn chèn BigQuery.

Kế hoạch xử lý lỗi

Đối với mỗi lỗi, chúng tôi sẽ tạo một hàng trong bảng BigQuery khác nhau, chứa nhiều thông tin hơn, như sự kiện gốc từ Kafka.

Khi xảy ra lỗi, chúng ta có thể phân tích bản ghi lỗi và có được một bức tranh đầy đủ về nó.

Sau đó, chúng ta có thể sửa mã đường ống, đặt lại / thay đổi bù trừ nhóm người tiêu dùng Kafka và phát lại các sự kiện một lần nữa, bây giờ với mã cố định.

Chúng tôi cũng có thể tự sửa sự kiện (ví dụ: trong các lỗi phân tích cú pháp JSON) và gửi lại cho Kafka.

Xử lý lỗi chuyển đổi

Chúng ta hãy xem nhanh chức năng biến đổi của chúng ta:

@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {

  TableRow convertedRow = new TableRow();

  insertLong(element.get("server_time"), "server_time", convertedRow);
  insertFloat(element.get("screen_dpi"), "screen_dpi", convertedRow);

  // more transformation to come

  context.output(output);

}  


private void insertLong(JsonNode value, String key, TableRow convertedRow) {
        String valueToInsert = value.asText();
        if (valueToInsert != null && !valueToInsert.isEmpty()) {
            long longValue = Long.parseLong(valueToInsert);
            convertedRow.set(key, longValue);
        }
}

private void insertFloat(JsonNode value, String key, TableRow convertedRow) {
        String valueToInsert = getStringValue(value);
        if (valueToInsert != null )  {
            float floatValue = Float.parseFloat(valueToInsert);
            convertedRow.set(key, floatValue);
        }
}

Có, chúng tôi có thể thất bại trong quá trình phân tích cú pháp, vì chúng tôi phân tích chuỗi thành Float / Long và điều này không thành công trên dữ liệu không thể chuyển đổi. 

Chúng ta cần loại trừ dữ liệu thất bại khỏi đầu ra chức năng chính và gửi dữ liệu này đến một đường dẫn khác trong đường ống, sau đó chúng ta sẽ lưu chúng vào một bảng lỗi trong BigQuery.

Làm sao? Hãy sử dụng thẻ

Khi chúng ta xuất một phần tử ở cuối  ParDo hàm, chúng ta có thể xuất nó trong một thẻ. Sau đó, chúng ta có thể nhận được tất cả các yếu tố được gắn thẻ theo tên cụ thể và thực hiện một số quy trình trên chúng.

Ở đây chúng tôi sẽ sử dụng hai thẻ, một là thẻ MAIN, chứa tất cả các bản ghi đã thành công và một thẻ chứa tất cả các lỗi với một số ngữ cảnh, chẳng hạn như  DEADLETTER_OUT.

Thẻ chính đó phải cùng loại với loại OUTPUT của  ParDo chính hàm, tất cả các thẻ khác có thể thuộc các loại khác nhau.

Bây giờ, ParDo chức năng của chúng tôi  sẽ trông như thế này (chú ý thêm thẻ):

@ProcessElement
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) {

  public static final TupleTag<JsonNode> MAIN_OUT = new TupleTag<JsonNode>() {};
  public static final TupleTag<BigQueryProcessError> DEADLETTER_OUT = new TupleTag<BigQueryProcessError>() {};

  TableRow convertedRow = new TableRow();

  try {

      insertLong(element.get("server_time"), "server_time", convertedRow);
      insertFloat(element.get("screen_dpi"), "screen_dpi", convertedRow);

      // more transformation to come

      context.output(output);

  } catch (Exception e) {
      logger.error("Failed transform "+e.getMessage(),e);
      context.output(DEADLETTER_OUT, new BigQueryProcessError(convertedRow.toString(), e.getMessage(), ERROR_TYPE.BQ_PROCESS, originEvent));

  }  

}  

Và làm thế nào chúng ta có thể xử lý các yếu tố bằng thẻ? Hãy thay đổi đường ống và phân chia. Các  MAIN phần tử sẽ chuyển đến bảng BigQuery và các  DEADLETTER_OUT phần tử sẽ được gửi đến bảng lỗi.

EventsProcessingOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                    .as(EventsProcessingOptions.class);

Pipeline p = Pipeline.create(options);

PCollectionTuple tableRows =
                  // read kafka topic
                   p.apply("kafka-topic-read", kafkaReader)
                    .apply("kafka-values", MapElements.into(TypeDescriptors.strings())
                            .via(record ->record.getKV().getValue()))
                    // convert value to JsonNode
                    .apply("string-to-json", ParseJsons.of(JsonNode.class))
                    // create TableRow
                    .apply("Build-table-row", ParDo.of(new EventsRowFn()).withOutputTags(MAIN_OUT, TupleTagList.of(DEADLETTER_OUT)));


                  // save the MAIN tag to BQ
                  tableRows
                      .get(MAIN_OUT)
                      .apply("BQ-write", BigQueryIO.<TableRowWithEvent>write()
                      .to(tableSpec)
                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

                  // save the DEADLETTER_OUT to BQ error table
                  tableRows
                    .get(DEADLETTER_OUT)
                    .apply("BQ-process-error-extract", ParDo.of(new BigQueryProcessErrorExtracFn()))
                    .apply("BQ-process-error-write", BigQueryIO.writeTableRows()
                            .to(errTableSpec)
                            .withJsonSchema(errSchema)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
  p.run();

Xử lý lỗi chèn BigQuery

Để xử lý các lỗi trong khi chèn BigQuery, chúng tôi sẽ phải sử dụng API BiqQueryIO. 

Hãy phóng to giai đoạn viết. và thay đổi nó một chút:

WriteResult writeResult = tableRowToInsertCollection
        .apply("BQ-write", BigQueryIO.write()
        // specify that failed rows will be returned with their error
                .withExtendedErrorInfo()
                .to(tableSpec)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        //Specfies a policy for handling failed inserts.
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));


// write failed rows with their error to error table                
writeResult
        .getFailedInsertsWithErr()
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("BQ-insert-error-extract", ParDo.of(new BigQueryInsertErrorExtractFn(tableRowToInsertView)).withSideInputs(tableRowToInsertView))
        .apply("BQ-insert-error-write", BigQueryIO.writeTableRows()
                .to(errTableSpec)
                .withJsonSchema(errSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Trong đoạn trích trên, chúng tôi nhận được TableRows không thành công với lỗi của họ từ BigQueryIO. Bây giờ chúng ta có thể chuyển đổi chúng thành một cái khác  TableRow và viết chúng vào một bảng lỗi. Trong trường hợp này, chúng tôi cho phép công việc tạo bảng khi cần.

Hữu ích 8 Chia sẻ Viết bình luận 0
Đã xem 3398