Skip to content

Commit 92202a5

Browse files
turcsanyippvillard31
authored andcommitted
NIFI-9764: Atlas reporting task sends 'unknown' hive_table when table is name not available
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #5839.
1 parent df00cc6 commit 92202a5

File tree

2 files changed

+21
-17
lines changed
  • nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src

2 files changed

+21
-17
lines changed

nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import java.util.Collections;
2930
import java.util.Set;
3031
import java.util.regex.Matcher;
3132
import java.util.regex.Pattern;
@@ -61,6 +62,8 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
6162
private static final String URI_PATTERN_STR = "jdbc:hive2://([^/]+)/?(.*)$";
6263
private static final Pattern URI_PATTERN = Pattern.compile(URI_PATTERN_STR);
6364

65+
private static final String UNKNOWN_TABLE = "unknown";
66+
6467
@Override
6568
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
6669

@@ -72,7 +75,7 @@ public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event)
7275

7376
final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
7477
if (!uriMatcher.matches()) {
75-
logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
78+
logger.warn("Unexpected transit URI: {}", transitUri);
7679
return null;
7780
}
7881

@@ -89,13 +92,20 @@ public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event)
8992
connectedDatabaseName = "default";
9093
}
9194

92-
final Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
93-
final Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
95+
Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
96+
Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
9497

9598
if (inputTables.isEmpty() && outputTables.isEmpty()) {
96-
// If input/output tables are unknown, create database level lineage.
99+
// If input/output tables are unknown, create hive_table entity with name 'unknown' (hive_db is not a DataSet entity and therefore it cannot be used in the lineage).
97100
// Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
98-
return getDatabaseRef(event.getComponentId(), event.getEventType(), namespace, connectedDatabaseName.toLowerCase());
101+
final ProvenanceEventType eventType = event.getEventType();
102+
if (eventType == ProvenanceEventType.RECEIVE || eventType == ProvenanceEventType.FETCH) {
103+
logger.warn("Input table name is missing, defaults to '{}'. Transit URI: {}", UNKNOWN_TABLE, transitUri);
104+
inputTables = Collections.singleton(new Tuple<>(connectedDatabaseName.toLowerCase(), UNKNOWN_TABLE));
105+
} else if (eventType == ProvenanceEventType.SEND) {
106+
logger.warn("Output table name is missing, defaults to '{}'. Transit URI: {}", UNKNOWN_TABLE, transitUri);
107+
outputTables = Collections.singleton(new Tuple<>(connectedDatabaseName.toLowerCase(), UNKNOWN_TABLE));
108+
}
99109
}
100110

101111
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
@@ -104,13 +114,6 @@ public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event)
104114
return refs;
105115
}
106116

107-
private DataSetRefs getDatabaseRef(String componentId, ProvenanceEventType eventType,
108-
String namespace, String databaseName) {
109-
final Referenceable ref = createDatabaseRef(namespace, databaseName);
110-
111-
return singleDataSetRef(componentId, eventType, ref);
112-
}
113-
114117
private void addRefs(DataSetRefs refs, boolean isInput, String namespace,
115118
Set<Tuple<String, String>> tableNames) {
116119
tableNames.forEach(tableName -> {

nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ public class TestHive2JDBC {
4545

4646
/**
4747
* If a provenance event does not have table name attributes,
48-
* then a database lineage should be created.
48+
* then a table lineage is created with table name 'unknown'.
49+
* Database lineage cannot be sent to Atlas because hive_db is not a DataSet entity.
4950
*/
5051
@Test
51-
public void testDatabaseLineage() {
52+
public void testUnknownTableLineage() {
5253
final String processorName = "PutHiveQL";
5354
final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
5455
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
@@ -69,9 +70,9 @@ public void testDatabaseLineage() {
6970
assertEquals(0, refs.getInputs().size());
7071
assertEquals(1, refs.getOutputs().size());
7172
Referenceable ref = refs.getOutputs().iterator().next();
72-
assertEquals("hive_db", ref.getTypeName());
73-
assertEquals("database_a", ref.get(ATTR_NAME));
74-
assertEquals("database_a@namespace1", ref.get(ATTR_QUALIFIED_NAME));
73+
assertEquals("hive_table", ref.getTypeName());
74+
assertEquals("unknown", ref.get(ATTR_NAME));
75+
assertEquals("database_a.unknown@namespace1", ref.get(ATTR_QUALIFIED_NAME));
7576
}
7677

7778
/**

0 commit comments

Comments
 (0)